context("basic")
skip_if_not_installed("dbplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("RSQLite")
# Setup
tmp <- tempdir()
db <- dbplyr::nycflights13_sqlite(tmp)
dir <- file.path(tmp, "nycflights")
dir.create(dir, showWarnings = FALSE)
dbdir <- fs::path(tmp, "local.sqlite")
#new_db <- DBI::dbConnect(RSQLite::SQLite(), dbdir)
new_db <- local_db()
## Note: later tests will overwrite existing tables and files, throwing warnings
test_that("we can ark and unark a db", {
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
skip_on_os("solaris")
ark(db, dir, lines = 50000, overwrite = TRUE)
suppressWarnings(
myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
## unark
# files <- fs::dir_ls(dir, glob = "*.tsv.bz2")
files <- list.files(dir, pattern = "[.]tsv.bz2$", full.names = TRUE)
expect_length(files, 5)
unark(files, new_db, lines = 50000, overwrite = TRUE)
myflights <- dplyr::tbl(new_db, "flights")
expect_is(myflights, "tbl_dbi")
myflights <- dplyr::collect(myflights)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
})
context("plain-txt")
test_that("we can ark and unark a db in plain text", {
skip_on_os("solaris")
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
ark(db, dir, lines = 50000, compress = "none", overwrite = TRUE)
# files <- fs::dir_ls(dir, glob = "*.tsv")
files <- list.files(dir, pattern = "[.]tsv.bz2$", full.names = TRUE)
expect_length(files, 5)
suppressWarnings( # ignore overwrite warning
myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
## unark
suppressWarnings( # ignore overwrite warning
unark(files, new_db, lines = 50000, overwrite = TRUE)
)
myflights <- dplyr::tbl(new_db, "flights")
expect_is(myflights, "tbl_dbi")
myflights <- dplyr::collect(myflights)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
})
context("alternate method")
test_that("alternate method for ark", {
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
skip_on_os("solaris")
expect_warning(
ark(db, dir, lines = 50000, method = "window", overwrite = TRUE),
"overwriting"
)
suppressWarnings(
myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
## unark
files <- list.files(dir, pattern = "[.]tsv.bz2$", full.names = TRUE)
expect_warning(
unark(files, new_db, lines = 50000, overwrite = TRUE),
"overwriting"
)
myflights <- dplyr::tbl(new_db, "flights")
expect_is(myflights, "tbl_dbi")
myflights <- dplyr::collect(myflights)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
})
context("MonetDB")
test_that("try with MonetDB & alternate method", {
skip_if_not_installed("MonetDBLite")
skip_if_not_installed("dplyr")
skip_if_not_installed("fs")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
skip_on_os("solaris")
skip_on_cran()
## SETUP, with text files. (Could skip as these now exist from above tests)
data <- list(
airlines = nycflights13::airlines,
airports = nycflights13::airports,
flights = nycflights13::flights
)
sink <- lapply(names(data), function(x) {
readr::write_tsv(data[[x]], fs::path(dir, paste0(x, ".tsv")))
})
files <- fs::dir_ls(dir, glob = "*.tsv")
# test unark on alternate DB
monet_dir <- fs::dir_create(fs::path(tmp, "monet"))
monet_db <- DBI::dbConnect(MonetDBLite::MonetDBLite(), monet_dir)
unark(files, monet_db, lines = 50000, overwrite = TRUE)
flights <- dplyr::tbl(monet_db, "flights")
expect_equal(dim(flights)[[2]], 19)
expect_is(flights, "tbl_dbi")
## clean out the text files
unlink(dir, TRUE) # ark'd text files
dir <- fs::dir_create(fs::path(tmp, "nycflights"))
## Test has_between
expect_false(arkdb:::has_between(monet_db, "airlines"))
#### Test ark ######
ark(monet_db, dir, lines = 50000L, method = "window", overwrite = TRUE)
## test ark results
suppressWarnings(
myflights <- readr::read_tsv(fs::path(dir, "flights.tsv.bz2"))
)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
disconnect(monet_db)
unlink(monet_dir, TRUE)
})
context("parquet")
test_that("try with parquet & alternate method", {
skip_if_not_installed("arrow") # Arrow installs on solaris, it just doesn't work
skip_if_not_installed("dplyr")
skip_if_not_installed("fs")
skip_if_not_installed("nycflights13")
skip_on_os("solaris")
# test ark to parquet from sqlite
ark(
db,
dir,
lines = 25000,
compress = "none",
streamable_table = streamable_parquet(),
method = "keep-open",
overwrite = TRUE
)
# Test that parts are written out appropriately
# airlines
expect_equal(
"part-00001.parquet",
list.files(paste0(dir, "/", "airlines"))
)
expect_equal(
nrow(arrow::open_dataset(paste0(dir, "/", "airlines"))),
dbGetQuery(db, "SELECT COUNT(*) FROM airlines")[[1]]
)
# airports
expect_equal(
"part-00001.parquet",
list.files(paste0(dir, "/", "airports"))
)
expect_equal(
nrow(arrow::open_dataset(paste0(dir, "/", "airports"))),
dbGetQuery(db, "SELECT COUNT(*) FROM airports")[[1]]
)
# flights
expect_true("part-00001.parquet" %in% list.files(paste0(dir, "/", "flights")))
expect_true("part-00014.parquet" %in% list.files(paste0(dir, "/", "flights")))
expect_length(list.files(paste0(dir, "/", "flights")), 14)
expect_equal(
nrow(arrow::open_dataset(paste0(dir, "/", "flights"))),
dbGetQuery(db, "SELECT COUNT(*) FROM flights")[[1]]
)
expect_equal(
dim(arrow::open_dataset(paste0(dir, "/", "flights"))),
dim(nycflights13::flights)
)
# Clean up!
unlink(paste0(dir, "/flights"), recursive = T, force = T)
})
context("inject filters")
test_that("e2e with filter for flights month = 2: readr tsv", {
# summary(factor(nycflights13::flights$month)) filter month == 2, 24951 records
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
skip_on_os("solaris")
suppressWarnings(file.remove(paste0(dir, "/flights.tsv.bz2")))
ark(db, dir,
streamable_table = streamable_readr_tsv(),
lines = 50000, tables = "flights", overwrite = TRUE,
filter_statement = "WHERE month = 2"
)
r <- read.csv(paste0(dir, "/flights.tsv.bz2"), sep = "\t")
expect_true(all(r$month == 2))
expect_true(nrow(r) == nrow(nycflights13::flights[nycflights13::flights$month == 2, ]))
})
# ark and unark parquet ----
test_that("We can ark and unark parquet", {
skip_if_not_installed("arrow")
skip_on_cran()
ark(db, dir,
streamable_table = streamable_parquet(),
lines = 50000, tables = "flights", overwrite = TRUE,
method = "window", compress = "none"
)
fls <- list.files(paste0(dir, "/flights"), pattern = "parquet", full.names = T)
DBI::dbRemoveTable(new_db, "flights")
unark(fls, new_db, streamable_table = streamable_parquet(), tablenames = "flights", overwrite = TRUE)
expect_equal(
nrow(DBI::dbReadTable(db, "flights")),
nrow(DBI::dbReadTable(new_db, "flights"))
)
expect_equal(DBI::dbReadTable(db, "flights")$flight, DBI::dbReadTable(new_db, "flights")$flight)
})
test_that("e2e with filter for flights month = 12: parquet", {
# summary(factor(nycflights13::flights$month)) filter month = 12, 28132 records
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("arrow")
skip_on_os("solaris")
skip_on_os("windows")
ark(db, dir,
streamable_table = streamable_parquet(),
lines = 50000, tables = "flights", overwrite = TRUE,
filter_statement = "WHERE month = 12", method = "window", compress = "none"
)
r <- arrow::read_parquet(paste0(dir, "/flights/part-00001.parquet"))
testthat::expect_true(all(r$month == 12))
testthat::expect_true(nrow(r) == nrow(nycflights13::flights[nycflights13::flights$month == 12, ]))
# clean up previous test
unlink(paste0(dir, "/flights"), TRUE)
# No warning!
testthat::expect_warning(ark(db, dir,
streamable_table = streamable_parquet(),
lines = 20000, tables = "flights", overwrite = TRUE,
filter_statement = "WHERE month = 12"
), "Parquet is already compressed")
r <- arrow::read_parquet(paste0(dir, "/flights/part-00001.parquet"))
testthat::expect_equal(length(list.files(paste0(dir, "/flights"))), 2)
testthat::expect_true(all(r$month == 12))
testthat::expect_true(nrow(r) == 20000) # expect split into chunks of 20k
})
test_that("Errors on window-parallel and not streamable parquet", {
expect_error(
ark(db = function(x) return(new_db), dir, tables = "fake_table", method = "window-parallel"), "only compatible with parquet"
)
})
test_that("Warns when applying filter to multiple tables", {
skip_on_os("solaris")
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
skip_if_not_installed("arrow")
# clean up previous test
#unlink(paste0(dir, "/flights"), TRUE)
#unlink(paste0(dir, "/weather"), TRUE)
expect_warning(
ark(db, dir,
streamable_table = streamable_parquet(),
lines = 50000, tables = c("flights", "weather"), overwrite = TRUE,
filter_statement = "WHERE month = 12", compress = "none"
),
"Your filter statement will be applied to all tables"
)
r <- arrow::read_parquet(paste0(dir, "/weather/part-00001.parquet"))
expect_true(all(r$month == 12))
expect_true(nrow(r) == nrow(nycflights13::weather[
nycflights13::weather$month == 12,
]))
})
context("callbacks")
test_that("ark with keep-open and callback", {
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
skip_if_not_installed("arrow")
skip_on_os("solaris")
# Callback to convert arr_delay in the flights
# data from minutes to hours
callback <- function(data) {
data$arr_delay <- as.numeric(data$arr_delay / 60)
data
}
suppressWarnings(
ark(db, dir, lines = 50000, tables = "flights", method = "keep-open", overwrite = TRUE, callback = callback)
)
suppressWarnings(
myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
expect_equal(myflights$arr_delay, nycflights13::flights$arr_delay / 60)
})
test_that("ark with window and callback", {
skip_if_not_installed("dplyr")
skip_if_not_installed("nycflights13")
skip_if_not_installed("readr")
skip_on_os("solaris")
# Callback to convert arr_delay in the flights
# data from minutes to hours
callback <- function(data) {
data$arr_delay <- as.numeric(data$arr_delay / 60)
data
}
suppressWarnings(
ark(db, dir, lines = 50000, tables = "flights", method = "window", overwrite = TRUE, callback = callback)
)
suppressWarnings(
myflights <- readr::read_tsv(file.path(dir, "flights.tsv.bz2"))
)
expect_equal(
dim(myflights),
dim(nycflights13::flights)
)
expect_equal(myflights$arr_delay, nycflights13::flights$arr_delay / 60)
})
disconnect(db)
disconnect(new_db)
unlink(dir, recursive = TRUE) # ark'd text/parquet files
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.