Nothing
test_that("update_snapshot() can handle first snapshot", {
for (conn in get_test_conns()) {
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
expect_false(table_exists(conn, "test.SCDB_logs"))
# Use unmodified mtcars as the initial snapshot
.data <- mtcars %>%
dplyr::copy_to(conn, df = ., name = unique_table_name())
# Configure the logger for this update
db_table <- "test.SCDB_tmp1"
timestamp <- "2022-10-01 09:00:00"
log_path <- tempdir()
# Ensure all logs are removed
dir(log_path) %>%
purrr::keep(~ endsWith(., ".log")) %>%
purrr::walk(~ unlink(file.path(log_path, .)))
logger <- Logger$new(
db_table = db_table,
timestamp = timestamp,
log_path = log_path,
log_table_id = "test.SCDB_logs",
log_conn = conn,
output_to_console = FALSE
)
# Update
update_snapshot(.data, conn, db_table, timestamp, logger = logger)
# Confirm snapshot is transferred correctly
expect_identical(
get_table(conn, db_table) %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec),
.data %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec)
)
### For this test, we also check that the log output is correct ###
# Check file log outputs exists
log_pattern <- glue::glue("{stringr::str_replace_all(as.Date(timestamp), '-', '_')}.{id(db_table, conn)}.log")
log_file <- purrr::keep(dir(log_path), ~ stringr::str_detect(., log_pattern))
expect_length(log_file, 1)
expect_gt(file.info(file.path(log_path, log_file))$size, 0)
expect_identical(nrow(get_table(conn, "test.SCDB_logs")), 1L)
db_logs_with_log_file <- get_table(conn, "test.SCDB_logs") %>%
dplyr::filter(!is.na(.data$log_file))
expect_identical(nrow(db_logs_with_log_file), 1L)
# Check database log output
logs <- get_table(conn, "test.SCDB_logs") %>% dplyr::collect()
# The logs should have specified data types
types <- c(
"date" = "POSIXct",
"catalog" = "character",
"schema" = "character",
"table" = "character",
"n_insertions" = "numeric",
"n_deactivations" = "numeric",
"start_time" = "POSIXct",
"end_time" = "POSIXct",
"duration" = "character",
"success" = "logical",
"message" = "character"
)
if (inherits(conn, "SQLiteConnection")) {
types <- types %>%
purrr::map_if(~ identical(., "POSIXct"), "character") %>% # SQLite does not support POSIXct
purrr::map_if(~ identical(., "logical"), "numeric") %>% # SQLite does not support logical
as.character()
}
checkmate::expect_data_frame(logs, nrows = 1, types)
# Check the content of the log table
expect_identical(as.character(logs$date), as.character(timestamp))
db_table_id <- id(db_table, conn)
if ("catalog" %in% colnames(logs)) expect_identical(logs$catalog, purrr::pluck(db_table_id, "name", "catalog"))
expect_identical(logs$schema, purrr::pluck(db_table_id, "name", "schema"))
expect_identical(logs$table, purrr::pluck(db_table_id, "name", "table"))
expect_identical(logs$n_insertions, nrow(mtcars))
expect_identical(logs$n_deactivations, 0L)
expect_true(as.logical(logs$success))
expect_identical(logs$message, NA_character_)
# Clean up the logs
unlink(logger$log_realpath)
close_connection(conn)
}
})
test_that("update_snapshot() can add a new snapshot", {
for (conn in get_test_conns()) {
# Modify snapshot and run update step
.data <- mtcars %>%
dplyr::mutate(hp = dplyr::if_else(hp > 130, hp - 10, hp)) %>%
dplyr::copy_to(conn, df = ., name = unique_table_name())
# Configure the logger for this update
db_table <- "test.SCDB_tmp1"
timestamp <- "2022-10-03 09:00:00"
logger <- Logger$new(
db_table = db_table,
timestamp = timestamp,
log_path = NULL,
log_table_id = "test.SCDB_logs",
log_conn = conn,
output_to_console = FALSE
)
# Update
# This is a simple update where 15 rows are replaced with 15 new ones on the given date
update_snapshot(.data, conn, db_table, timestamp, logger = logger)
# Check the snapshot has updated correctly
target <- dplyr::tbl(conn, id(db_table, conn))
expect_identical(
slice_time(target, "2022-10-01 09:00:00") %>%
dplyr::select(!c("from_ts", "until_ts", "checksum")) %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec),
mtcars %>%
dplyr::arrange(wt, qsec) %>%
tibble::as_tibble()
)
expect_identical(
nrow(slice_time(target, "2022-10-01 09:00:00")),
nrow(mtcars)
)
expect_identical(
slice_time(target, "2022-10-03 09:00:00") %>%
dplyr::select(!c("from_ts", "until_ts", "checksum")) %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec),
.data %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec)
)
expect_identical(
nrow(slice_time(target, "2022-10-03 09:00:00")),
nrow(mtcars)
)
# Check database log output
logs <- get_table(conn, "test.SCDB_logs") %>%
dplyr::collect() %>%
utils::tail(1)
expect_identical(logs$n_insertions, 15L)
expect_identical(logs$n_deactivations, 15L)
expect_true(as.logical(logs$success))
close_connection(conn)
}
})
test_that("update_snapshot() can update a snapshot on an existing date", {
for (conn in get_test_conns()) {
# We now attempt to do another update on the same date
.data <- mtcars %>%
dplyr::mutate(hp = dplyr::if_else(hp > 100, hp - 10, hp)) %>%
dplyr::copy_to(conn, df = ., name = unique_table_name())
# Configure the logger for this update
db_table <- "test.SCDB_tmp1"
timestamp <- "2022-10-03 09:00:00"
logger <- Logger$new(
db_table = db_table,
timestamp = timestamp,
log_path = NULL,
log_table_id = "test.SCDB_logs",
log_conn = conn,
output_to_console = FALSE
)
# This is a more complicated update where a further 8 rows are replaced with 8 new ones on the same date as before
update_snapshot(.data, conn, db_table, "2022-10-03 09:00:00", logger = logger)
# Even though we insert twice on the same date, we expect the data to be minimal (compacted)
target <- dplyr::tbl(conn, id(db_table, conn))
expect_identical(
slice_time(target, "2022-10-01 09:00:00") %>%
dplyr::select(!c("from_ts", "until_ts", "checksum")) %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec),
mtcars %>%
dplyr::arrange(wt, qsec) %>%
tibble::tibble()
)
expect_identical(
nrow(slice_time(target, "2022-10-01 09:00:00")),
nrow(mtcars)
)
expect_identical(
slice_time(target, "2022-10-03 09:00:00") %>%
dplyr::select(!c("from_ts", "until_ts", "checksum")) %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec),
.data %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec)
)
expect_identical(
nrow(slice_time(target, "2022-10-03 09:00:00")),
nrow(mtcars)
)
# Check database log output
logs <- get_table(conn, "test.SCDB_logs") %>%
dplyr::collect() %>%
utils::tail(1)
expect_identical(logs$n_insertions, 8L)
expect_identical(logs$n_deactivations, 8L)
expect_true(as.logical(logs$success))
close_connection(conn)
}
})
test_that("update_snapshot() can insert a snapshot between existing dates", {
for (conn in get_test_conns()) {
# We now attempt to an update between these two updates
.data <- mtcars %>%
dplyr::mutate(hp = dplyr::if_else(hp > 150, hp - 10, hp)) %>%
dplyr::copy_to(conn, df = ., name = unique_table_name())
# This should fail if we do not specify "enforce_chronological_order = FALSE"
expect_error(
update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00", logger = LoggerNull$new()),
regexp = "Given timestamp 2022-10-02 09:00:00 is earlier"
)
# But not with the variable set
update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00",
logger = LoggerNull$new(), enforce_chronological_order = FALSE)
target <- dplyr::tbl(conn, id("test.SCDB_tmp1", conn))
expect_identical(
slice_time(target, "2022-10-01 09:00:00") %>%
dplyr::select(!c("from_ts", "until_ts", "checksum")) %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec),
mtcars %>%
dplyr::arrange(wt, qsec) %>%
tibble::tibble()
)
expect_identical(
nrow(slice_time(target, "2022-10-01 09:00:00")),
nrow(mtcars)
)
expect_identical(
slice_time(target, "2022-10-02 09:00:00") %>%
dplyr::select(!c("from_ts", "until_ts", "checksum")) %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec),
.data %>%
dplyr::collect() %>%
dplyr::arrange(wt, qsec)
)
expect_identical(
nrow(slice_time(target, "2022-10-02 09:00:00")),
nrow(mtcars)
)
close_connection(conn)
}
})
test_that("update_snapshot() works (holistic test 1)", {
for (conn in get_test_conns()) {
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
# Create test data for the test
t0 <- data.frame(col1 = c("A", "B"), col2 = c(NA_real_, NA_real_))
t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, NA_real_, NA_real_))
t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, 2, 3))
# Copy t0, t1, and t2 to conn
t0 <- dplyr::copy_to(conn, t0, name = id("test.SCDB_t0", conn), overwrite = TRUE, temporary = FALSE)
t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE)
t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE)
logger <- LoggerNull$new()
update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01 08:00:00", logger = logger)
expect_identical(
dplyr::collect(t0) %>% dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) %>% dplyr::arrange(col1)
)
update_snapshot(
t1,
conn,
"test.SCDB_tmp1",
"2022-01-01 08:10:00",
logger = logger,
collapse_continuous_records = TRUE
)
expect_identical(
dplyr::collect(t1) %>% dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) %>% dplyr::arrange(col1)
)
update_snapshot(
t2,
conn,
"test.SCDB_tmp1",
"2022-01-01 08:10:00",
logger = logger,
collapse_continuous_records = TRUE
)
expect_identical(
dplyr::collect(t2) %>% dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) %>% dplyr::arrange(col1)
)
t <- list(t0, t1, t2) %>%
purrr::reduce(dplyr::union) %>%
dplyr::collect() %>%
dplyr::mutate(col2 = as.character(col2)) %>%
dplyr::arrange(col1, col2) %>%
utils::head(5)
t_ref <- get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::select(!any_of(c("from_ts", "until_ts", "checksum"))) %>%
dplyr::collect() %>%
dplyr::mutate(col2 = as.character(col2)) %>%
dplyr::arrange(col1, col2)
expect_identical(t, t_ref)
close_connection(conn)
}
})
test_that("update_snapshot() works (holistic test 2)", {
for (conn in get_test_conns()) {
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
# Create test data for the test
t0 <- data.frame(col1 = c("A", "B"), col2 = c(NA_real_, NA_real_))
t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, NA_real_, NA_real_))
t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, 2, 3))
# Copy t0, t1, and t2 to conn (and suppress check_from message)
t0 <- dplyr::copy_to(conn, t0, name = id("test.SCDB_t0", conn), overwrite = TRUE, temporary = FALSE)
t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE)
t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE)
# Check non-chronological insertion
logger <- LoggerNull$new()
update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01", logger = logger)
expect_identical(dplyr::collect(t0) %>% dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) %>% dplyr::arrange(col1))
update_snapshot(t2, conn, "test.SCDB_tmp1", "2022-03-01", logger = logger)
expect_identical(dplyr::collect(t2) %>% dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) %>% dplyr::arrange(col1))
update_snapshot(t1, conn, "test.SCDB_tmp1", "2022-02-01", logger = logger, enforce_chronological_order = FALSE)
expect_identical(
dplyr::collect(t1) %>% dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1", slice_ts = "2022-02-01")) %>% dplyr::arrange(col1)
)
t_ref <-
tibble::tibble(col1 = c("A", "B", "A", "C", "B", "C"),
col2 = c(NA_real_, NA_real_, 1, NA_real_, 2, 3),
from_ts = c("2022-01-01", "2022-01-01", "2022-02-01", "2022-02-01", "2022-03-01", "2022-03-01"),
until_ts = c("2022-02-01", "2022-03-01", NA, "2022-03-01", NA, NA))
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::select(!"checksum") %>%
dplyr::collect() %>%
dplyr::mutate(from_ts = strftime(from_ts),
until_ts = strftime(until_ts)) %>%
dplyr::arrange(col1, from_ts),
t_ref %>%
dplyr::arrange(col1, from_ts)
)
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
connection_clean_up(conn)
}
})
test_that("update_snapshot() handles 'NULL' updates", {
for (conn in get_test_conns()) {
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn))
# Use mtcars as the test data set
.data <- mtcars %>%
dplyr::copy_to(conn, df = ., name = unique_table_name())
defer_db_cleanup(.data)
# This is a simple update where 23 rows are replaced with 23 new ones on the given date
db_table <- "test.SCDB_tmp1"
create_logger <- function(timestamp) {
Logger$new(
db_table = db_table,
timestamp = timestamp,
log_path = NULL,
log_table_id = "test.SCDB_logs",
log_conn = conn,
output_to_console = FALSE
)
}
# Update the table with update_snapshot() and store the results
update_snapshot(.data, conn, db_table, "2022-10-03 09:00:00", logger = create_logger("2022-10-03 09:00:00"))
target_data_1 <- get_table(conn, db_table, slice_ts = NULL) %>% dplyr::collect()
# Update the table with the same data again update_snapshot() and store the results
update_snapshot(.data, conn, db_table, "2022-10-04 09:00:00", logger = create_logger("2022-10-04 09:00:00"))
target_data_2 <- get_table(conn, db_table, slice_ts = NULL) %>% dplyr::collect()
# Check that the two updates are identical
expect_identical(target_data_1, target_data_2)
# Confirm with logs that no updates have been made
logs <- get_table(conn, id("test.SCDB_logs", conn)) %>%
dplyr::collect() %>%
dplyr::arrange(date)
expect_identical(logs$n_insertions, c(nrow(mtcars), 0L))
expect_identical(logs$n_deactivations, c(0L, 0L))
connection_clean_up(conn)
}
})
test_that("update_snapshot() works with Id objects", {
withr::local_options("SCDB.log_path" = NULL) # No file logging
for (conn in get_test_conns()) {
target_table <- id("test.mtcars_modified", conn)
logger <- Logger$new(output_to_console = FALSE,
timestamp = Sys.time(),
db_table = "test.mtcars_modified",
log_conn = NULL,
log_table_id = NULL,
warn = FALSE)
expect_no_error(
mtcars %>%
dplyr::mutate(disp = sample(mtcars$disp, nrow(mtcars))) %>%
dplyr::copy_to(dest = conn, df = ., name = unique_table_name()) %>%
update_snapshot(
conn = conn,
db_table = target_table,
logger = logger,
timestamp = format(Sys.time())
)
)
connection_clean_up(conn)
}
})
test_that("update_snapshot() checks table formats", {
withr::local_options("SCDB.log_path" = tempdir())
for (conn in get_test_conns()) {
mtcars_table <- dplyr::tbl(conn, id("__mtcars_historical", conn = conn))
timestamp <- Sys.time()
expect_warning(
logger <- Logger$new(log_path = NULL, log_table_id = NULL, output_to_console = FALSE), # nolint: implicit_assignment_linter
"NO file or database logging will be done."
)
# Test columns not matching
broken_table <- dplyr::copy_to(conn, dplyr::select(mtcars, !"mpg"), name = "mtcars_broken", overwrite = TRUE)
expect_error(
update_snapshot(
.data = broken_table,
conn = conn,
db_table = mtcars_table,
timestamp = timestamp,
logger = logger
),
"Columns do not match!"
)
file.remove(list.files(getOption("SCDB.log_path"), pattern = format(timestamp, "^%Y%m%d.%H%M"), full.names = TRUE))
# Test target table not being a historical table
expect_error(
update_snapshot(
dplyr::tbl(conn, id("__mtcars", conn = conn)),
conn,
id("__mtcars", conn = conn),
timestamp = timestamp,
logger = logger
),
"Table does not seem like a historical table"
)
connection_clean_up(conn)
}
})
test_that("update_snapshot() works with across connection", {
skip_if_not_installed("RSQLite")
withr::local_options("SCDB.log_path" = NULL) # No file logging
# Test a data transfer from a local SQLite to the test connection
source_conn <- DBI::dbConnect(RSQLite::SQLite())
# Create a table for the tests
mtcars_modified <- mtcars %>%
dplyr::mutate(name = rownames(mtcars))
# Copy table to the source
.data <- dplyr::copy_to(dest = source_conn, df = mtcars_modified, name = unique_table_name())
# For each conn, we test if update_snapshot preserves data types
for (target_conn in get_test_conns()) {
target_table <- id("test.mtcars_modified", target_conn)
if (DBI::dbExistsTable(target_conn, target_table)) DBI::dbRemoveTable(target_conn, target_table)
logger <- LoggerNull$new()
# Check we can transfer without error
expect_no_error(
update_snapshot(
.data,
conn = target_conn,
db_table = target_table,
logger = logger,
timestamp = format(Sys.time())
)
)
# Check that if we collect the table, the signature will match the original
table_signature <- get_table(target_conn, target_table) %>%
dplyr::collect() %>%
dplyr::summarise(dplyr::across(tidyselect::everything(), ~ class(.)[1])) %>%
as.data.frame()
expect_identical(
table_signature,
dplyr::summarise(mtcars_modified, dplyr::across(tidyselect::everything(), ~ class(.)[1]))
)
DBI::dbRemoveTable(target_conn, target_table)
connection_clean_up(target_conn)
rm(logger)
invisible(gc())
}
connection_clean_up(source_conn)
## Now we test the reverse transfer
# Test a data transfer from the test connection to a local SQLite
target_conn <- DBI::dbConnect(RSQLite::SQLite())
# For each conn, we test if update_snapshot preserves data types
for (source_conn in get_test_conns()) {
.data <- dplyr::copy_to(dest = source_conn, df = mtcars_modified, name = unique_table_name())
target_table <- id("mtcars_modified", target_conn)
if (DBI::dbExistsTable(target_conn, target_table)) DBI::dbRemoveTable(target_conn, target_table)
logger <- LoggerNull$new()
# Check we can transfer without error
expect_no_error(
update_snapshot(
.data,
conn = target_conn,
db_table = target_table,
logger = logger,
timestamp = format(Sys.time())
)
)
# Check that if we collect the table, the signature will match the original
table_signature <- get_table(target_conn, target_table) %>%
dplyr::collect() %>%
dplyr::summarise(dplyr::across(tidyselect::everything(), ~ class(.)[1])) %>%
as.data.frame()
expect_identical(
table_signature,
dplyr::summarise(mtcars_modified, dplyr::across(tidyselect::everything(), ~ class(.)[1]))
)
connection_clean_up(source_conn)
rm(logger)
invisible(gc())
}
connection_clean_up(target_conn)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.