tests/testthat/test-dbFetch.R

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

context("dbFetch")

source("utilities.R")

test_that("dbFetch works with live database", {
  conn <- setup_live_connection()

  result <- dbSendQuery(conn, "SELECT 1 AS n")
  expect_error(
    dbFetch(result, 1),
    ".*fetching custom number of rows.*is not supported.*"
  )
  expect_equal(dbFetch(result, -1), tibble::tibble(n = 1))
  expect_true(dbHasCompleted(result))

  result <- dbSendQuery(conn, "SELECT 2 AS n")
  expect_true(dbClearResult(result))
  expect_error(
    dbFetch(result, -1),
    ".*Result object is not valid.*"
  )

  result <- dbSendQuery(conn, "SELECT 3 AS n LIMIT 0")
  rv <- dbFetch(result, -1)
  expect_equal(rv, tibble::tibble(n = 1L)[FALSE, , drop = FALSE])
})

test_that("dbFetch works with mock", {
  conn <- setup_mock_connection()
  with_mock(
    `httr::POST` = mock_httr_replies(
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "QUEUED",
        request_body =
          "^SELECT n FROM \\(VALUES \\(1\\), \\(2\\)\\) AS t \\(n\\)$",
        next_uri = "http://localhost:8000/query_1/1",
        query_id = "query_1"
      ),
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "RUNNING",
        request_body = "SELECT 2",
        next_uri = "http://localhost:8000/query_2/1"
      ),
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "RUNNING",
        request_body = "SELECT 3",
        next_uri = "http://localhost:8000/query_3/1"
      ),
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "FINISHED",
        request_body = "SELECT 4 LIMIT 0",
        next_uri = "http://localhost:8000/query_4/1"
      )
    ),
    `httr::GET` = mock_httr_replies(
      mock_httr_response(
        "http://localhost:8000/query_1/1",
        status_code = 200,
        data = tibble::tibble(n = 1),
        state = "FINISHED",
        next_uri = "http://localhost:8000/query_1/2"
      ),
      mock_httr_response(
        "http://localhost:8000/query_1/2",
        status_code = 200,
        data = tibble::tibble(n = 2),
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_2/1",
        status_code = 400,
        data = "Broken URL",
        state = "FAILED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_3/1",
        status_code = 200,
        data = "Failed URL",
        state = "FAILED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/1",
        status_code = 200,
        data = tibble::tibble("_col0" = 4)[FALSE, , drop = FALSE],
        state = "FINISHED"
      )
    ),
    `httr::DELETE` = mock_httr_replies(
      mock_httr_response(
        url = "http://localhost:8000/v1/query/query_1",
        status_code = 200,
        state = ""
      )
    ),
    `httr::handle_reset` = function(...) {
      return()
    },
    {
      result <- dbSendQuery(conn, "SELECT n FROM (VALUES (1), (2)) AS t (n)")
      expect_error(
        dbFetch(result, 1),
        ".*fetching custom number of rows.*is not supported.*"
      )
      expect_equal(dbFetch(result), tibble::tibble(n = 1L))
      expect_true(dbClearResult(result))
      expect_error(
        dbFetch(result),
        ".*Result object is not valid.*"
      )

      result <- dbSendQuery(conn, "SELECT n FROM (VALUES (1), (2)) AS t (n)")
      expect_equal(dbGetRowCount(result), 0)
      expect_equal(dbFetch(result), tibble::tibble(n = 1L))
      expect_equal(dbGetRowCount(result), 1)
      expect_equal(dbFetch(result), tibble::tibble(n = 2L))
      expect_equal(dbGetRowCount(result), 2)
      expect_true(dbHasCompleted(result))

      result <- dbSendQuery(conn, "SELECT n FROM (VALUES (1), (2)) AS t (n)")
      expect_equal(dbFetch(result, -1), tibble::tibble(n = c(1L, 2L)))
      expect_equal(dbGetRowCount(result), 2)

      result <- dbSendQuery(conn, "SELECT 2")
      expect_error(
        dbFetch(result),
        paste0(
          "Cannot fetch .*, error: ",
          "There was a problem with the request and we have exhausted ",
          "our retry limit"
        )
      )

      result <- dbSendQuery(conn, "SELECT 3")
      expect_error(
        dbFetch(result),
        ".*Query .*localhost.8000.query.3.1 failed.*"
      )

      result <- dbSendQuery(conn, "SELECT 4 LIMIT 0")
      rv <- dbFetch(result, -1)
      ev <- tibble::tibble("_col0" = numeric(0))
      expect_equal_data_frame(rv, ev)
    }
  )
  with_mock(
    `httr::POST` = mock_httr_replies(
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "RUNNING",
        request_body = "SELECT 3",
        next_uri = "http://localhost:8000/query_3/1"
      ),
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "RUNNING",
        request_body = "SELECT 4",
        next_uri = "http://localhost:8000/query_4/1"
      )
    ),
    `httr::GET` = function(url, ...) {
      if (url == "http://localhost:8000/query_3/1") {
        stop("Error")
      }
      if (request.count == 0) {
        request.count <<- 1
        stop("First request is an error")
      }
      return(mock_httr_replies(
        mock_httr_response(
          "http://localhost:8000/query_4/1",
          status_code = 200,
          state = "FINISHED",
          data = tibble::tibble(n = 4)
        )
      )(url, ...))
    },
    `httr::handle_reset` = function(...) {
      return()
    },
    {
      result <- dbSendQuery(conn, "SELECT 3")
      expect_error(
        suppressMessages(dbFetch(result)),
        paste0(
          "Cannot fetch .*, error: ",
          "There was a problem with the request and we have exhausted our ",
          "retry limit"
        )
      )

      assign("request.count", 0, envir = environment(httr::GET))
      result <- dbSendQuery(conn, "SELECT 4")
      expect_message(
        v <- dbFetch(result),
        "First request is an error"
      )
      expect_equal(v, tibble::tibble(n = 4))
    }
  )
})

with_locale(test.locale(), test_that)("dbFetch rbind works correctly", {
  conn <- setup_mock_connection()

  with_mock(
    `httr::POST` = mock_httr_replies(
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "FINISHED",
        request_body = "SELECT \\* FROM all_types",
        next_uri = "http://localhost:8000/query_1/1"
      )
    ),
    `httr::GET` = mock_httr_replies(
      mock_httr_response(
        "http://localhost:8000/query_1/1",
        status_code = 200,
        next_uri = "http://localhost:8000/query_1/2",
        data = data.frame.with.all.classes(1),
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_1/2",
        status_code = 200,
        data = data.frame.with.all.classes(2),
        state = "FINISHED"
      )
    ),
    {
      result <- dbSendQuery(conn, "SELECT * FROM all_types")

      x <- dbFetch(result, -1)
      y <- data.frame.with.all.classes()
      y$POSIXct <-
        lubridate::with_tz(y$POSIXct, test.timezone())
      nms <- names(y$list_named[[1]])
      y$list_named[[1]] <- purrr::flatten_dbl(y$list_named[[1]])
      names(y$list_named[[1]]) <- nms
      y$list_named[[2]] <- numeric(0)
      y$list_unnamed[[1]] <- purrr::flatten_dbl(y$list_unnamed[[1]])
      y$list_unnamed[[2]] <- numeric(0)
      expect_equal_data_frame(x, y)
    }
  )
})

with_locale(test.locale(), test_that)("dbFetch rbind works with zero row chunks", {
  conn <- setup_mock_connection()
  data <- tibble::tibble(
    integer = c(1L, 2L),
    double = c(3.0, 4),
    logical = c(TRUE, NA)
  )
  full.data <- data.frame.with.all.classes()
  full.data <- full.data[
    ,
    -match(c("raw", "list_unnamed", "list_named"), colnames(full.data)),
    drop = FALSE
  ]
  with_mock(
    `httr::POST` = mock_httr_replies(
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "FINISHED",
        request_body = "SELECT integer, double, logical FROM all_types",
        next_uri = "http://localhost:8000/query_1/1"
      ),
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "PAUSED",
        request_body = "SELECT double FROM all_types",
        next_uri = "http://localhost:8000/query_2/1"
      ),
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "PAUSED",
        request_body = "SELECT \\* FROM all_types",
        next_uri = "http://localhost:8000/query_3/1"
      ),
      mock_httr_response(
        "http://localhost:8000/v1/statement",
        status_code = 200,
        state = "QUEUED",
        request_body = "SELECT \\* FROM all_types_with_queue",
        next_uri = "http://localhost:8000/query_4/1"
      )
    ),
    `httr::GET` = mock_httr_replies(
      mock_httr_response(
        "http://localhost:8000/query_1/1",
        status_code = 200,
        next_uri = "http://localhost:8000/query_1/2",
        data = data[1, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_1/2",
        status_code = 200,
        next_uri = "http://localhost:8000/query_1/3",
        data = data[FALSE, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_1/3",
        status_code = 200,
        data = data[2, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_2/1",
        status_code = 200,
        next_uri = "http://localhost:8000/query_2/2",
        data = data[1, "double", drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_2/2",
        status_code = 200,
        next_uri = "http://localhost:8000/query_2/3",
        data = data[FALSE, "double", drop = FALSE],
        state = "RUNNING"
      ),
      mock_httr_response(
        "http://localhost:8000/query_2/3",
        status_code = 200,
        data = data[2, "double", drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_3/1",
        status_code = 200,
        next_uri = "http://localhost:8000/query_3/2",
        data = full.data[FALSE, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_3/2",
        status_code = 200,
        data = full.data,
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/1",
        status_code = 200,
        next_uri = "http://localhost:8000/query_4/2",
        state = "QUEUED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/2",
        status_code = 200,
        next_uri = "http://localhost:8000/query_4/3",
        state = "QUEUED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/3",
        next_uri = "http://localhost:8000/query_4/4",
        status_code = 200,
        data = full.data[1, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/4",
        status_code = 200,
        next_uri = "http://localhost:8000/query_4/5",
        state = "QUEUED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/5",
        next_uri = "http://localhost:8000/query_4/6",
        status_code = 200,
        data = full.data[FALSE, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/6",
        next_uri = "http://localhost:8000/query_4/7",
        status_code = 200,
        data = full.data[2, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/7",
        next_uri = "http://localhost:8000/query_4/8",
        status_code = 200,
        data = full.data[FALSE, , drop = FALSE],
        state = "FINISHED"
      ),
      mock_httr_response(
        "http://localhost:8000/query_4/8",
        status_code = 200,
        state = "FINISHED"
      )
    ),
    {
      result <- dbSendQuery(
        conn,
        "SELECT integer, double, logical FROM all_types"
      )

      expect_equal_data_frame(
        dbFetch(result, -1),
        data,
        label = "multiple columns"
      )

      result <- dbSendQuery(conn, "SELECT double FROM all_types")

      expect_equal_data_frame(
        dbFetch(result, -1),
        data[, "double", drop = FALSE],
        label = "single column"
      )

      result <- dbSendQuery(conn, "SELECT * FROM all_types")
      full.data$POSIXct <-
        lubridate::with_tz(full.data$POSIXct, test.timezone())
      expect_equal_data_frame(
        dbFetch(result, -1),
        full.data,
        label = "zero chunk first"
      )

      result <- dbSendQuery(conn, "SELECT * FROM all_types_with_queue")
      expect_equal_data_frame(
        dbFetch(result, -1),
        full.data,
        label = "with empty chunks"
      )
    }
  )
})

Try the RPresto package in your browser

Any scripts or data that you put into this service are public.

RPresto documentation built on Nov. 2, 2023, 5:58 p.m.