Nothing
test_that("update_snapshot() works", {
for (conn in get_test_conns()) {
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn))
target <- mtcars |>
dplyr::copy_to(conn, df = _, name = unique_table_name()) |>
digest_to_checksum(col = "checksum") |>
dplyr::mutate(from_ts = !!db_timestamp("2022-10-01 09:00:00", conn),
until_ts = !!db_timestamp(NA, conn))
# Copy target to conn
target <- dplyr::copy_to(conn, target, name = id("test.SCDB_tmp1", conn), overwrite = TRUE, temporary = FALSE)
.data <- mtcars |>
dplyr::mutate(hp = dplyr::if_else(hp > 130, hp - 10, hp)) |>
dplyr::copy_to(conn, df = _, name = unique_table_name())
# This is a simple update where 23 rows are replaced with 23 new ones on the given date
db_table <- "test.SCDB_tmp1"
timestamp <- "2022-10-03 09:00:00"
log_path <- tempdir()
logger <- Logger$new(
db_table = db_table,
timestamp = timestamp,
log_path = log_path,
log_table_id = "test.SCDB_logs",
log_conn = conn,
output_to_console = FALSE
)
dir(log_path) |>
purrr::keep(~ endsWith(., ".log")) |>
purrr::walk(~ unlink(file.path(log_path, .)))
update_snapshot(.data, conn, db_table, timestamp, logger = logger)
expect_identical(slice_time(target, "2022-10-01 09:00:00") |>
dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
dplyr::collect() |>
dplyr::arrange(wt, qsec),
mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble())
expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")),
nrow(mtcars))
expect_identical(slice_time(target, "2022-10-03 09:00:00") |>
dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
dplyr::collect() |>
dplyr::arrange(wt, qsec),
.data |> dplyr::collect() |> dplyr::arrange(wt, qsec))
expect_equal(nrow(slice_time(target, "2022-10-03 09:00:00")),
nrow(mtcars))
# Check file log outputs exists
log_pattern <- glue::glue("{stringr::str_replace_all(as.Date(timestamp), '-', '_')}.{id(db_table, conn)}.log")
log_file <- purrr::keep(dir(log_path), ~stringr::str_detect(., log_pattern))
expect_length(log_file, 1)
expect_gt(file.info(file.path(log_path, log_file))$size, 0)
expect_identical(nrow(get_table(conn, "test.SCDB_logs")), 1)
db_logs_with_log_file <- get_table(conn, "test.SCDB_logs") |>
dplyr::filter(!is.na(.data$log_file))
expect_identical(nrow(db_logs_with_log_file), 1)
# Check database log output
logs <- get_table(conn, "test.SCDB_logs") |> dplyr::collect()
# The logs should have specified data types
types <- c(
"date" = "POSIXct",
"catalog" = "character",
"schema" = "character",
"table" = "character",
"n_insertions" = "numeric",
"n_deactivations" = "numeric",
"start_time" = "POSIXct",
"end_time" = "POSIXct",
"duration" = "character",
"success" = "logical",
"message" = "character"
)
if (inherits(conn, "SQLiteConnection")) {
types <- types |>
purrr::map_if(~ identical(., "POSIXct"), "character") |> # SQLite does not support POSIXct
purrr::map_if(~ identical(., "logical"), "numneric") |> # SQLite does not support logical
as.character()
}
checkmate::expect_data_frame(logs, nrows = 1, types)
# We now attempt to do another update on the same date
.data <- mtcars |>
dplyr::mutate(hp = dplyr::if_else(hp > 100, hp - 10, hp)) |>
dplyr::copy_to(conn, df = _, name = unique_table_name())
update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-03 09:00:00", logger = logger)
# Even though we insert twice on the same date, we expect the data to be minimal (compacted)
expect_identical(slice_time(target, "2022-10-01 09:00:00") |>
dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
dplyr::collect() |>
dplyr::arrange(wt, qsec),
mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble())
expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")),
nrow(mtcars))
expect_identical(slice_time(target, "2022-10-03 09:00:00") |>
dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
dplyr::collect() |>
dplyr::arrange(wt, qsec),
.data |> dplyr::collect() |> dplyr::arrange(wt, qsec))
expect_equal(nrow(slice_time(target, "2022-10-03 09:00:00")),
nrow(mtcars))
# We now attempt to an update between these two updates
.data <- mtcars |>
dplyr::mutate(hp = dplyr::if_else(hp > 150, hp - 10, hp)) |>
dplyr::copy_to(conn, df = _, name = unique_table_name())
# This should fail if we do not specify "enforce_chronological_order = FALSE"
expect_error(
update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00", logger = logger),
regexp = "Given timestamp 2022-10-02 09:00:00 is earlier"
)
# But not with the variable set
update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00",
logger = logger, enforce_chronological_order = FALSE)
expect_identical(slice_time(target, "2022-10-01 09:00:00") |>
dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
dplyr::collect() |>
dplyr::arrange(wt, qsec),
mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble())
expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")),
nrow(mtcars))
expect_identical(slice_time(target, "2022-10-02 09:00:00") |>
dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
dplyr::collect() |>
dplyr::arrange(wt, qsec),
.data |> dplyr::collect() |> dplyr::arrange(wt, qsec))
expect_equal(nrow(slice_time(target, "2022-10-02 09:00:00")),
nrow(mtcars))
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
# Check deletion of redundant rows
t0 <- data.frame(col1 = c("A", "B"), col2 = c(NA_real_, NA_real_))
t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, NA_real_, NA_real_))
t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, 2, 3))
# Copy t0, t1, and t2 to conn
t0 <- dplyr::copy_to(conn, t0, name = id("test.SCDB_t0", conn), overwrite = TRUE, temporary = FALSE)
t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE)
t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE)
update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01", logger = logger)
expect_identical(dplyr::collect(t0) |> dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))
update_snapshot(t1, conn, "test.SCDB_tmp1", "2022-02-01", logger = logger)
expect_identical(dplyr::collect(t1) |> dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))
update_snapshot(t2, conn, "test.SCDB_tmp1", "2022-02-01", logger = logger)
expect_identical(dplyr::collect(t2) |> dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))
t <- list(t0, t1, t2) |>
purrr::reduce(dplyr::union) |>
dplyr::collect() |>
dplyr::mutate(col2 = as.character(col2)) |>
dplyr::arrange(col1, col2) |>
utils::head(5)
t_ref <- get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) |>
dplyr::select(!any_of(c("from_ts", "until_ts", "checksum"))) |>
dplyr::collect() |>
dplyr::mutate(col2 = as.character(col2)) |>
dplyr::arrange(col1, col2)
expect_identical(t, t_ref)
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
# Check non-chronological insertion
update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01", logger = logger)
expect_identical(dplyr::collect(t0) |> dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))
update_snapshot(t2, conn, "test.SCDB_tmp1", "2022-03-01", logger = logger)
expect_identical(dplyr::collect(t2) |> dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))
update_snapshot(t1, conn, "test.SCDB_tmp1", "2022-02-01", logger = logger, enforce_chronological_order = FALSE)
expect_identical(dplyr::collect(t1) |> dplyr::arrange(col1),
dplyr::collect(get_table(conn, "test.SCDB_tmp1", slice_ts = "2022-02-01")) |> dplyr::arrange(col1))
t_ref <-
tibble::tibble(col1 = c("A", "B", "A", "C", "B", "C"),
col2 = c(NA_real_, NA_real_, 1, NA_real_, 2, 3),
from_ts = c("2022-01-01", "2022-01-01", "2022-02-01", "2022-02-01", "2022-03-01", "2022-03-01"),
until_ts = c("2022-02-01", "2022-03-01", NA, "2022-03-01", NA, NA))
expect_identical(get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) |>
dplyr::select(!"checksum") |>
dplyr::collect() |>
dplyr::mutate(from_ts = strftime(from_ts),
until_ts = strftime(until_ts)) |>
dplyr::arrange(col1, from_ts),
t_ref |>
dplyr::arrange(col1, from_ts))
if (file.exists(logger$log_realpath)) file.remove(logger$log_realpath)
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
connection_clean_up(conn)
}
})
test_that("update_snapshot works with Id objects", {
withr::local_options("SCDB.log_path" = NULL) # No file logging
for (conn in get_test_conns()) {
target_table <- id("test.mtcars_modified", conn)
logger <- Logger$new(output_to_console = FALSE,
timestamp = Sys.time(),
db_table = "test.mtcars_modified",
log_conn = NULL,
log_table_id = NULL,
warn = FALSE)
expect_no_error(
mtcars |>
dplyr::mutate(disp = sample(mtcars$disp, nrow(mtcars))) |>
dplyr::copy_to(dest = conn,
df = _,
name = unique_table_name()) |>
update_snapshot(
conn = conn,
db_table = target_table,
logger = logger,
timestamp = format(Sys.time())
)
)
connection_clean_up(conn)
}
})
test_that("update_snapshot checks table formats", {
withr::local_options("SCDB.log_path" = tempdir())
for (conn in get_test_conns()) {
mtcars_table <- dplyr::tbl(conn, id("__mtcars_historical", conn = conn))
timestamp <- Sys.time()
expect_warning(
logger <- Logger$new(log_path = NULL, log_table_id = NULL, output_to_console = FALSE), # nolint: implicit_assignment_linter
"NO file or database logging will be done."
)
# Test columns not matching
broken_table <- dplyr::copy_to(conn, dplyr::select(mtcars, !"mpg"), name = "mtcars_broken", overwrite = TRUE)
expect_error(
update_snapshot(
.data = broken_table,
conn = conn,
db_table = mtcars_table,
timestamp = timestamp,
logger = logger
),
"Columns do not match!"
)
file.remove(list.files(getOption("SCDB.log_path"), pattern = format(timestamp, "^%Y%m%d.%H%M"), full.names = TRUE))
# Test target table not being a historical table
expect_error(
update_snapshot(
dplyr::tbl(conn, id("__mtcars", conn = conn)),
conn,
id("__mtcars", conn = conn),
timestamp = timestamp,
logger = logger
),
"Table does not seem like a historical table"
)
connection_clean_up(conn)
}
})
test_that("update_snapshot works with across connection", {
skip_if_not_installed("RSQLite")
withr::local_options("SCDB.log_path" = NULL) # No file logging
# Test a data transfer from a local SQLite to the test connection
source_conn <- DBI::dbConnect(RSQLite::SQLite())
# Create a table for the tests
mtcars_modified <- mtcars |>
dplyr::mutate(name = rownames(mtcars))
# Copy table to the source
.data <- dplyr::copy_to(dest = source_conn, df = mtcars_modified, name = unique_table_name())
# For each conn, we test if update_snapshot preserves data types
for (target_conn in get_test_conns()) {
target_table <- id("test.mtcars_modified", target_conn)
if (DBI::dbExistsTable(target_conn, target_table)) DBI::dbRemoveTable(target_conn, target_table)
logger <- LoggerNull$new()
# Check we can transfer without error
expect_no_error(
update_snapshot(
.data,
conn = target_conn,
db_table = target_table,
logger = logger,
timestamp = format(Sys.time())
)
)
# Check that if we collect the table, the signature will match the original
table_signature <- get_table(target_conn, target_table) |>
dplyr::collect() |>
dplyr::summarise(dplyr::across(tidyselect::everything(), ~ class(.)[1])) |>
as.data.frame()
expect_identical(
table_signature,
dplyr::summarise(mtcars_modified, dplyr::across(tidyselect::everything(), ~ class(.)[1]))
)
DBI::dbRemoveTable(target_conn, target_table)
connection_clean_up(target_conn)
rm(logger)
invisible(gc())
}
connection_clean_up(source_conn)
## Now we test the reverse transfer
# Test a data transfer from the test connection to a local SQLite
target_conn <- DBI::dbConnect(RSQLite::SQLite())
# For each conn, we test if update_snapshot preserves data types
for (source_conn in get_test_conns()) {
.data <- dplyr::copy_to(dest = source_conn, df = mtcars_modified, name = unique_table_name())
target_table <- id("mtcars_modified", target_conn)
if (DBI::dbExistsTable(target_conn, target_table)) DBI::dbRemoveTable(target_conn, target_table)
logger <- LoggerNull$new()
# Check we can transfer without error
expect_no_error(
update_snapshot(
.data,
conn = target_conn,
db_table = target_table,
logger = logger,
timestamp = format(Sys.time())
)
)
# Check that if we collect the table, the signature will match the original
table_signature <- get_table(target_conn, target_table) |>
dplyr::collect() |>
dplyr::summarise(dplyr::across(tidyselect::everything(), ~ class(.)[1])) |>
as.data.frame()
expect_identical(
table_signature,
dplyr::summarise(mtcars_modified, dplyr::across(tidyselect::everything(), ~ class(.)[1]))
)
connection_clean_up(source_conn)
rm(logger)
invisible(gc())
}
connection_clean_up(target_conn)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.