Nothing
for (conn in get_test_conns()) {
# Ensure test tables are ready
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
expect_false(table_exists(conn, "test.SCDB_tmp2"))
# We create a data set for the tests in SCDB_tmp1 (!)
t0 <- data.frame(col1 = c("A", "B"), col2 = c(NA_real_, NA_real_))
t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1, NA_real_, NA_real_))
t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(NA_real_, 2, 3))
# Copy t0, t1, and t2 to conn
t0 <- dplyr::copy_to(conn, t0, name = id("test.SCDB_t0", conn), overwrite = TRUE, temporary = FALSE)
t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE)
t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE)
logger_null <- LoggerNull$new()
test_that("delta loading works for incremental backups", {
# Build state on test connection
# Update 1
update_snapshot(
.data = t0,
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp = "2022-01-01 08:00:00",
logger = logger_null
)
delta_1 <- delta_export(
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp_from = "2022-01-01 08:00:00"
)
defer_db_cleanup(delta_1)
expect_identical(nrow(delta_1), 2L)
# Replay delta on another table
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
# Update 2
update_snapshot(
.data = t1,
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp = "2022-01-01 08:10:00",
logger = logger_null
)
delta_2 <- delta_export(
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp_from = "2022-01-01 08:10:00"
)
defer_db_cleanup(delta_2)
expect_identical(nrow(delta_2), 3L)
# Replay delta on another table
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
# Update 3
update_snapshot(
.data = t2,
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp = "2022-01-01 08:20:00",
logger = logger_null
)
delta_3 <- delta_export(
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp_from = "2022-01-01 08:20:00"
)
defer_db_cleanup(delta_3)
expect_identical(nrow(delta_3), 6L)
# Replay delta on another table
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
# Clear and run all deltas
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
expect_false(table_exists(conn, "test.SCDB_tmp2"))
# Replay deltas
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1)
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2)
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
# Replay out of order
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
expect_false(table_exists(conn, "test.SCDB_tmp2"))
# Replay deltas
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1)
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3)
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
})
test_that("delta loading works to migrate data", {
# Re-build the table on another connection
# (one with date-time (duckdb) and one without (sqlite))
source_conns <- list()
if (rlang::is_installed("SQLite")) {
source_conns <- list("SQLite" = get_connection(RSQLite::SQLite(), dbname = ":memory:"))
}
if (rlang::is_installed("duckdb") && R.version$major >= "4") { # DuckDB requires R >= 4.0.0 to run in memory
source_conns <- c(
source_conns,
"DuckDB" = get_connection(duckdb::duckdb(), dbdir = ":memory:")
)
}
for (source_conn in source_conns) {
# Update 1
update_snapshot(
.data = t0,
conn = source_conn,
db_table = "source",
timestamp = "2022-01-01 08:00:00",
logger = logger_null
)
# Update 2
update_snapshot(
.data = t1,
conn = source_conn,
db_table = "source",
timestamp = "2022-01-01 08:10:00",
logger = logger_null
)
# Update 3
update_snapshot(
.data = t2,
conn = source_conn,
db_table = "source",
timestamp = "2022-01-01 08:20:00",
logger = logger_null
)
## Test 1 ##############################################################
# Clear test tables on connection
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
expect_false(table_exists(conn, "test.SCDB_tmp2"))
# Export first timestamp
delta_1 <- delta_export(
conn = source_conn,
db_table = "source",
timestamp_from = "2022-01-01 08:00:00",
timestamp_until = "2022-01-01 08:00:01" # Some future point in time
)
defer_db_cleanup(delta_1)
# .. and load from a fresh state
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_1
)
)
# Generate equivalent state on the connection
update_snapshot(
.data = t0,
conn = conn,
db_table = "test.SCDB_tmp2",
timestamp = "2022-01-01 08:00:00",
logger = logger_null
)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
## Test 2 ##############################################################
# Export second timestamp
delta_2 <- delta_export(
conn = source_conn,
db_table = "source",
timestamp_from = "2022-01-01 08:10:00",
timestamp_until = "2022-01-01 08:10:01" # Some future point in time
)
defer_db_cleanup(delta_2)
# .. add to existing state
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_2
)
)
# Generate equivalent state on the other connection
update_snapshot(
.data = t1,
conn = conn,
db_table = "test.SCDB_tmp2",
timestamp = "2022-01-01 08:10:00",
logger = logger_null
)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
## Test 2.5 ############################################################
# Re-apply the second delta
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_2
)
)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
## Test 3 ##############################################################
# Export third timestamp
delta_3 <- delta_export(
conn = source_conn,
db_table = "source",
timestamp_from = "2022-01-01 08:20:00",
timestamp_until = "2022-01-01 08:20:01" # Some future point in time
)
defer_db_cleanup(delta_3)
# .. add to existing state
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_3
)
)
# Generate equivalent state on the other connection
update_snapshot(
.data = t2,
conn = conn,
db_table = "test.SCDB_tmp2",
timestamp = "2022-01-01 08:20:00",
logger = logger_null
)
# Check transfer success (now until_ts should also match)
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
## Test 4 ##############################################################
# Clear test tables on connection
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
# Run a single batch update
delta_batch <- delta_export(
conn = source_conn,
db_table = "source",
timestamp_from = "2022-01-01 08:00:00",
timestamp_until = "2022-01-01 08:20:01" # Some future point in time
)
defer_db_cleanup(delta_batch)
# Add delta to the target connection
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_batch
)
)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
## Test 4 ##############################################################
# Clear test tables on connection
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
# Run a single batch update (no timestamp_until)
delta_batch_open_ended <- delta_export(
conn = source_conn,
db_table = "source",
timestamp_from = "2022-01-01 08:10:00" # From second update
)
defer_db_cleanup(delta_batch_open_ended)
# Generate partial state on the other connection
update_snapshot(
.data = t0,
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp = "2022-01-01 08:00:00",
logger = logger_null
)
# Add delta to the partial state on target connection
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_batch_open_ended
)
)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
## Test 5 ##############################################################
# Clear test tables on connection
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
# Out of order application
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_1
)
)
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_3
)
)
suppressMessages(
delta_load(
conn,
db_table = "test.SCDB_tmp1",
delta = delta_2
)
)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
close_connection(source_conn)
}
})
test_that("delta loading writes logs as expected", {
# Clear and run all deltas
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
expect_false(table_exists(conn, "test.SCDB_tmp1"))
if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
expect_false(table_exists(conn, "test.SCDB_tmp2"))
if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn))
expect_false(table_exists(conn, "test.SCDB_logs"))
# Configure a logger for the normal update steps
logger_1 <- Logger$new(
db_table = "test.SCDB_tmp1",
log_table_id = "test.SCDB_logs",
log_conn = conn,
output_to_console = FALSE
)
# Build a test state
# Update 1
update_snapshot(
.data = t0,
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp = "2022-01-01 08:00:00",
logger = logger_1
)
delta_1 <- delta_export(
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp_from = "2022-01-01 08:00:00"
)
defer_db_cleanup(delta_1)
# Update 2
update_snapshot(
.data = t1,
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp = "2022-01-01 08:10:00",
logger = logger_1
)
delta_2 <- delta_export(
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp_from = "2022-01-01 08:10:00"
)
defer_db_cleanup(delta_2)
# Update 3
update_snapshot(
.data = t2,
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp = "2022-01-01 08:20:00",
logger = logger_1
)
delta_3 <- delta_export(
conn = conn,
db_table = "test.SCDB_tmp1",
timestamp_from = "2022-01-01 08:20:00"
)
defer_db_cleanup(delta_3)
# Configure a logger for the delta load
logger_2 <- Logger$new(
db_table = "test.SCDB_tmp2",
log_table_id = "test.SCDB_logs",
log_conn = conn
)
# Replay deltas
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1, logger = logger_2)
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2, logger = logger_2)
delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3, logger = logger_2)
# Check transfer success
expect_identical(
get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts),
get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
dplyr::collect() %>%
dplyr::arrange(col1, col2, from_ts)
)
# Check logs match insertions and deactivations
expect_identical(
dplyr::tbl(conn, id("test.SCDB_logs", conn)) %>%
tidyr::unite("db_table_name", dplyr::any_of(c("catalog", "schema", "table")), sep = ".") %>%
dplyr::filter(.data$db_table_name == !!as.character(id("test.SCDB_tmp2", conn))) %>% # delta_load() logs
dplyr::select("date", "n_insertions", "n_deactivations") %>%
dplyr::collect() %>%
dplyr::arrange(.data$date),
dplyr::tbl(conn, id("test.SCDB_logs", conn)) %>%
tidyr::unite("db_table_name", dplyr::any_of(c("catalog", "schema", "table")), sep = ".") %>%
dplyr::filter(.data$db_table_name == !!as.character(id("test.SCDB_tmp1", conn))) %>% # update_snapshot() logs
dplyr::select("date", "n_insertions", "n_deactivations") %>%
dplyr::collect() %>%
dplyr::arrange(.data$date)
)
})
close_connection(conn)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.