tests/testthat/test-arkdb.R

context("basic")

skip_if_not_installed("dbplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("RSQLite")

# Setup
tmp <- tempdir()
db <- dbplyr::nycflights13_sqlite(tmp)
dir <- file.path(tmp, "nycflights")
dir.create(dir, showWarnings = FALSE)
dbdir <- fs::path(tmp, "local.sqlite")
#new_db <- DBI::dbConnect(RSQLite::SQLite(), dbdir)
new_db <- local_db()
## Note: later tests will overwrite existing tables and files, throwing warnings

test_that("we can ark and unark a db", {
  
  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")
  skip_on_os("solaris")

  ark(db, dir, lines = 50000, overwrite = TRUE)


  suppressWarnings(
    myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
  )
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )

  ## unark
  # files <- fs::dir_ls(dir, glob = "*.tsv.bz2")
  files <- list.files(dir, pattern = "[.]tsv.bz2$", full.names = TRUE)
  expect_length(files, 5)
  unark(files, new_db, lines = 50000, overwrite = TRUE)

  myflights <- dplyr::tbl(new_db, "flights")
  expect_is(myflights, "tbl_dbi")

  myflights <- dplyr::collect(myflights)
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )
})


context("plain-txt")

test_that("we can ark and unark a db in plain text", {

  skip_on_os("solaris")
  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")

  ark(db, dir, lines = 50000, compress = "none", overwrite = TRUE)

  # files <- fs::dir_ls(dir, glob = "*.tsv")
  files <- list.files(dir, pattern = "[.]tsv.bz2$", full.names = TRUE)
  expect_length(files, 5)

  suppressWarnings( # ignore overwrite warning
    myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
  )
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )

  ## unark
  suppressWarnings( # ignore overwrite warning
    unark(files, new_db, lines = 50000, overwrite = TRUE)
  )
  myflights <- dplyr::tbl(new_db, "flights")
  expect_is(myflights, "tbl_dbi")

  myflights <- dplyr::collect(myflights)
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )
})


context("alternate method")

test_that("alternate method for ark", {
  
  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")
  skip_on_os("solaris")
  
  expect_warning( 
    ark(db, dir, lines = 50000, method = "window", overwrite = TRUE),
    "overwriting"
  )

  suppressWarnings(
    myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
  )
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )


  ## unark
  files <- list.files(dir, pattern = "[.]tsv.bz2$", full.names = TRUE)
  expect_warning(
    unark(files, new_db, lines = 50000, overwrite = TRUE),
    "overwriting"
  )

  myflights <- dplyr::tbl(new_db, "flights")
  expect_is(myflights, "tbl_dbi")
  myflights <- dplyr::collect(myflights)
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )
})

context("MonetDB")
test_that("try with MonetDB & alternate method", {
  
  skip_if_not_installed("MonetDBLite")
  skip_if_not_installed("dplyr")
  skip_if_not_installed("fs")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")
  skip_on_os("solaris")
  skip_on_cran()
  
  ## SETUP, with text files. (Could skip as these now exist from above tests)
  data <- list(
    airlines = nycflights13::airlines,
    airports = nycflights13::airports,
    flights = nycflights13::flights
  )
  sink <- lapply(names(data), function(x) {
    readr::write_tsv(data[[x]], fs::path(dir, paste0(x, ".tsv")))
  })

  files <- fs::dir_ls(dir, glob = "*.tsv")

  # test unark on alternate DB
  monet_dir <- fs::dir_create(fs::path(tmp, "monet"))
  monet_db <- DBI::dbConnect(MonetDBLite::MonetDBLite(), monet_dir)
  unark(files, monet_db, lines = 50000, overwrite = TRUE)

  flights <- dplyr::tbl(monet_db, "flights")
  expect_equal(dim(flights)[[2]], 19)
  expect_is(flights, "tbl_dbi")

  ## clean out the text files
  unlink(dir, TRUE) # ark'd text files
  dir <- fs::dir_create(fs::path(tmp, "nycflights"))

  ## Test has_between
  expect_false(arkdb:::has_between(monet_db, "airlines"))

  #### Test ark ######
  ark(monet_db, dir, lines = 50000L, method = "window", overwrite = TRUE)

  ## test ark results
  suppressWarnings(
    myflights <- readr::read_tsv(fs::path(dir, "flights.tsv.bz2"))
  )
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )

  disconnect(monet_db)
  unlink(monet_dir, TRUE)
})







context("parquet")

test_that("try with parquet & alternate method", {
  
  
  skip_if_not_installed("arrow") # Arrow installs on solaris, it just doesn't work
  skip_if_not_installed("dplyr")
  skip_if_not_installed("fs")
  skip_if_not_installed("nycflights13")
  skip_on_os("solaris")

  
  # test ark to parquet from sqlite
  ark(
    db,
    dir,
    lines = 25000,
    compress = "none",
    streamable_table = streamable_parquet(),
    method = "keep-open",
    overwrite = TRUE
  )

  # Test that parts are written out appropriately

  # airlines
  expect_equal(
    "part-00001.parquet",
    list.files(paste0(dir, "/", "airlines"))
  )

  expect_equal(
    nrow(arrow::open_dataset(paste0(dir, "/", "airlines"))),
    dbGetQuery(db, "SELECT COUNT(*) FROM airlines")[[1]]
  )

  # airports
  expect_equal(
    "part-00001.parquet",
    list.files(paste0(dir, "/", "airports"))
  )

  expect_equal(
    nrow(arrow::open_dataset(paste0(dir, "/", "airports"))),
    dbGetQuery(db, "SELECT COUNT(*) FROM airports")[[1]]
  )

  # flights
  expect_true("part-00001.parquet" %in% list.files(paste0(dir, "/", "flights")))
  expect_true("part-00014.parquet" %in% list.files(paste0(dir, "/", "flights")))

  expect_length(list.files(paste0(dir, "/", "flights")), 14)

  expect_equal(
    nrow(arrow::open_dataset(paste0(dir, "/", "flights"))),
    dbGetQuery(db, "SELECT COUNT(*) FROM flights")[[1]]
  )

  expect_equal(
    dim(arrow::open_dataset(paste0(dir, "/", "flights"))),
    dim(nycflights13::flights)
  )
  
  # Clean up!
  unlink(paste0(dir, "/flights"), recursive = T, force = T)
})




context("inject filters")

test_that("e2e with filter for flights month = 2: readr tsv", {
  # summary(factor(nycflights13::flights$month)) filter month == 2, 24951 records
  
  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")
  skip_on_os("solaris")
  
  suppressWarnings(file.remove(paste0(dir, "/flights.tsv.bz2")))

  ark(db, dir,
    streamable_table = streamable_readr_tsv(),
    lines = 50000, tables = "flights", overwrite = TRUE,
    filter_statement = "WHERE month = 2"
  )

  r <- read.csv(paste0(dir, "/flights.tsv.bz2"), sep = "\t")
  expect_true(all(r$month == 2))
  expect_true(nrow(r) == nrow(nycflights13::flights[nycflights13::flights$month == 2, ]))
})

# ark and unark parquet ----

test_that("We can ark and unark parquet", {
  skip_if_not_installed("arrow")
  skip_on_cran()
  ark(db, dir,
      streamable_table = streamable_parquet(),
      lines = 50000, tables = "flights", overwrite = TRUE, 
      method = "window", compress = "none"
  )
  
  fls <- list.files(paste0(dir, "/flights"), pattern = "parquet", full.names = T)
  DBI::dbRemoveTable(new_db, "flights")
  unark(fls, new_db, streamable_table = streamable_parquet(), tablenames = "flights", overwrite = TRUE)
  
  expect_equal(
    nrow(DBI::dbReadTable(db, "flights")), 
    nrow(DBI::dbReadTable(new_db, "flights"))
  )
  expect_equal(DBI::dbReadTable(db, "flights")$flight, DBI::dbReadTable(new_db, "flights")$flight)
})


test_that("e2e with filter for flights month = 12: parquet", {
  # summary(factor(nycflights13::flights$month)) filter month = 12, 28132 records

  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("arrow")
  skip_on_os("solaris")
  skip_on_os("windows")
  
  ark(db, dir,
      streamable_table = streamable_parquet(),
      lines = 50000, tables = "flights", overwrite = TRUE,
      filter_statement = "WHERE month = 12", method = "window", compress = "none"
  )
  
  r <- arrow::read_parquet(paste0(dir, "/flights/part-00001.parquet"))
  testthat::expect_true(all(r$month == 12))
  testthat::expect_true(nrow(r) == nrow(nycflights13::flights[nycflights13::flights$month == 12, ]))

  # clean up previous test
  unlink(paste0(dir, "/flights"), TRUE)
  
  # No warning!
  testthat::expect_warning(ark(db, dir,
    streamable_table = streamable_parquet(),
    lines = 20000, tables = "flights", overwrite = TRUE,
    filter_statement = "WHERE month = 12"
  ), "Parquet is already compressed")

  r <- arrow::read_parquet(paste0(dir, "/flights/part-00001.parquet"))

  testthat::expect_equal(length(list.files(paste0(dir, "/flights"))), 2)
  testthat::expect_true(all(r$month == 12))
  testthat::expect_true(nrow(r) == 20000) # expect split into chunks of 20k
})

test_that("Errors on window-parallel and not streamable parquet", {
  expect_error(
    ark(db = function(x) return(new_db), dir, tables = "fake_table", method = "window-parallel"), "only compatible with parquet"
  )
})


test_that("Warns when applying filter to multiple tables", {
  
  skip_on_os("solaris")
  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")
  skip_if_not_installed("arrow")
  
  
  # clean up previous test
  #unlink(paste0(dir, "/flights"), TRUE)
  #unlink(paste0(dir, "/weather"), TRUE)
  
  expect_warning(
    ark(db, dir,
      streamable_table = streamable_parquet(),
      lines = 50000, tables = c("flights", "weather"), overwrite = TRUE,
      filter_statement = "WHERE month = 12", compress = "none"
    ),
    "Your filter statement will be applied to all tables"
  )

  r <- arrow::read_parquet(paste0(dir, "/weather/part-00001.parquet"))
  expect_true(all(r$month == 12))
  expect_true(nrow(r) == nrow(nycflights13::weather[
    nycflights13::weather$month == 12,
  ]))
})


context("callbacks")

test_that("ark with keep-open and callback", {
  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")
  skip_if_not_installed("arrow")
  skip_on_os("solaris")
  
  # Callback to convert arr_delay in the flights
  # data from minutes to hours
  callback <- function(data) {
    data$arr_delay <- as.numeric(data$arr_delay / 60)
    data
  }

  suppressWarnings(
    ark(db, dir, lines = 50000, tables = "flights", method = "keep-open", overwrite = TRUE, callback = callback)
  )


  suppressWarnings(
    myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
  )
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )

  expect_equal(myflights$arr_delay, nycflights13::flights$arr_delay / 60)
})




test_that("ark with window and callback", {
  
  
  skip_if_not_installed("dplyr")
  skip_if_not_installed("nycflights13")
  skip_if_not_installed("readr")
  skip_on_os("solaris")
  
  # Callback to convert arr_delay in the flights
  # data from minutes to hours
  callback <- function(data) {
    data$arr_delay <- as.numeric(data$arr_delay / 60)
    data
  }

  suppressWarnings(
    ark(db, dir, lines = 50000, tables = "flights", method = "window", overwrite = TRUE, callback = callback)
  )


  suppressWarnings(
    myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
  )
  expect_equal(
    dim(myflights),
    dim(nycflights13::flights)
  )

  expect_equal(myflights$arr_delay, nycflights13::flights$arr_delay / 60)
})


disconnect(db)
disconnect(new_db)
unlink(dir, recursive = TRUE) # ark'd text/parquet files
ropensci/arkdb documentation built on Jan. 27, 2024, 4:47 p.m.