tests/testthat/test-delta-loading.R

for (conn in get_test_conns()) {

  # Ensure test tables are ready
  if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
  expect_false(table_exists(conn, "test.SCDB_tmp1"))

  if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
  expect_false(table_exists(conn, "test.SCDB_tmp2"))

  # We create a data set for the tests in SCDB_tmp1 (!)
  t0 <- data.frame(col1 = c("A", "B"),      col2 = c(NA_real_, NA_real_))
  t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1,        NA_real_, NA_real_))
  t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(NA_real_,        2,        3))

  # Copy t0, t1, and t2 to conn
  t0 <- dplyr::copy_to(conn, t0, name = id("test.SCDB_t0", conn), overwrite = TRUE, temporary = FALSE)
  t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE)
  t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE)

  logger_null <- LoggerNull$new()


  test_that("delta loading works for incremental backups", {

    # Build state on test connection

    # Update 1
    update_snapshot(
      .data = t0,
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp = "2022-01-01 08:00:00",
      logger = logger_null
    )

    delta_1 <- delta_export(
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp_from  = "2022-01-01 08:00:00"
    )
    defer_db_cleanup(delta_1)
    expect_identical(nrow(delta_1), 2L)

    # Replay delta on another table
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1)

    # Check transfer success
    expect_identical(
      get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts),
      get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts)
    )


    # Update 2
    update_snapshot(
      .data = t1,
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp = "2022-01-01 08:10:00",
      logger = logger_null
    )

    delta_2 <- delta_export(
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp_from  = "2022-01-01 08:10:00"
    )
    defer_db_cleanup(delta_2)
    expect_identical(nrow(delta_2), 3L)

    # Replay delta on another table
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2)

    # Check transfer success
    expect_identical(
      get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts),
      get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts)
    )


    # Update 3
    update_snapshot(
      .data = t2,
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp = "2022-01-01 08:20:00",
      logger = logger_null
    )

    delta_3 <- delta_export(
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp_from  = "2022-01-01 08:20:00"
    )
    defer_db_cleanup(delta_3)
    expect_identical(nrow(delta_3), 6L)

    # Replay delta on another table
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3)

    # Check transfer success
    expect_identical(
      get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts),
      get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts)
    )

    # Clear and run all deltas
    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
    expect_false(table_exists(conn, "test.SCDB_tmp2"))

    # Replay deltas
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1)
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2)
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3)

    # Check transfer success
    expect_identical(
      get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts),
      get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts)
    )

    # Replay out of order
    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
    expect_false(table_exists(conn, "test.SCDB_tmp2"))

    # Replay deltas
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1)
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3)
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2)

    # Check transfer success
    expect_identical(
      get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts),
      get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts)
    )
  })


  test_that("delta loading works to migrate data", {

    # Re-build the table on another connection
    # (one with date-time (duckdb) and one without (sqlite))
    source_conns <- list()
    if (rlang::is_installed("SQLite")) {
      source_conns <- list("SQLite" = get_connection(RSQLite::SQLite(), dbname = ":memory:"))
    }

    if (rlang::is_installed("duckdb") && R.version$major >= "4") { # DuckDB requires R >= 4.0.0 to run in memory
      source_conns <- c(
        source_conns,
        "DuckDB" = get_connection(duckdb::duckdb(), dbdir = ":memory:")
      )
    }

    for (source_conn in source_conns) {

      # Update 1
      update_snapshot(
        .data = t0,
        conn = source_conn,
        db_table = "source",
        timestamp = "2022-01-01 08:00:00",
        logger = logger_null
      )

      # Update 2
      update_snapshot(
        .data = t1,
        conn = source_conn,
        db_table = "source",
        timestamp = "2022-01-01 08:10:00",
        logger = logger_null
      )

      # Update 3
      update_snapshot(
        .data = t2,
        conn = source_conn,
        db_table = "source",
        timestamp = "2022-01-01 08:20:00",
        logger = logger_null
      )


      ## Test 1 ##############################################################

      # Clear test tables on connection
      if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
      expect_false(table_exists(conn, "test.SCDB_tmp1"))

      if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
      expect_false(table_exists(conn, "test.SCDB_tmp2"))

      # Export first timestamp
      delta_1 <- delta_export(
        conn = source_conn,
        db_table = "source",
        timestamp_from  = "2022-01-01 08:00:00",
        timestamp_until = "2022-01-01 08:00:01" # Some future point in time
      )
      defer_db_cleanup(delta_1)

      # .. and load from a fresh state
      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_1
        )
      )

      # Generate equivalent state on the connection
      update_snapshot(
        .data = t0,
        conn = conn,
        db_table = "test.SCDB_tmp2",
        timestamp = "2022-01-01 08:00:00",
        logger = logger_null
      )

      # Check transfer success
      expect_identical(
        get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts),
        get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts)
      )

      ## Test 2 ##############################################################

      # Export second timestamp
      delta_2 <- delta_export(
        conn = source_conn,
        db_table = "source",
        timestamp_from  = "2022-01-01 08:10:00",
        timestamp_until = "2022-01-01 08:10:01" # Some future point in time
      )
      defer_db_cleanup(delta_2)

      # .. add to existing state
      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_2
        )
      )

      # Generate equivalent state on the other connection
      update_snapshot(
        .data = t1,
        conn = conn,
        db_table = "test.SCDB_tmp2",
        timestamp = "2022-01-01 08:10:00",
        logger = logger_null
      )

      # Check transfer success
      expect_identical(
        get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts),
        get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts)
      )

      ## Test 2.5 ############################################################

      # Re-apply the second delta
      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_2
        )
      )

      # Check transfer success
      expect_identical(
        get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts),
        get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts)
      )

      ## Test 3 ##############################################################

      # Export third timestamp
      delta_3 <- delta_export(
        conn = source_conn,
        db_table = "source",
        timestamp_from  = "2022-01-01 08:20:00",
        timestamp_until = "2022-01-01 08:20:01" # Some future point in time
      )
      defer_db_cleanup(delta_3)

      # .. add to existing state
      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_3
        )
      )

      # Generate equivalent state on the other connection
      update_snapshot(
        .data = t2,
        conn = conn,
        db_table = "test.SCDB_tmp2",
        timestamp = "2022-01-01 08:20:00",
        logger = logger_null
      )

      # Check transfer success (now until_ts should also match)
      expect_identical(
        get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts),
        get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts)
      )

      ## Test 4 ##############################################################

      # Clear test tables on connection
      if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
      expect_false(table_exists(conn, "test.SCDB_tmp1"))

      # Run a single batch update
      delta_batch <- delta_export(
        conn = source_conn,
        db_table = "source",
        timestamp_from  = "2022-01-01 08:00:00",
        timestamp_until = "2022-01-01 08:20:01"  # Some future point in time
      )
      defer_db_cleanup(delta_batch)

      # Add delta to the target connection
      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_batch
        )
      )

      # Check transfer success
      expect_identical(
        get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts),
        get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts)
      )

      ## Test 4 ##############################################################

      # Clear test tables on connection
      if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
      expect_false(table_exists(conn, "test.SCDB_tmp1"))

      # Run a single batch update (no timestamp_until)
      delta_batch_open_ended <- delta_export(
        conn = source_conn,
        db_table = "source",
        timestamp_from  = "2022-01-01 08:10:00" # From second update
      )
      defer_db_cleanup(delta_batch_open_ended)

      # Generate partial state on the other connection
      update_snapshot(
        .data = t0,
        conn = conn,
        db_table = "test.SCDB_tmp1",
        timestamp = "2022-01-01 08:00:00",
        logger = logger_null
      )

      # Add delta to the partial state on target connection
      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_batch_open_ended
        )
      )

      # Check transfer success
      expect_identical(
        get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts),
        get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts)
      )

      ## Test 5 ##############################################################

      # Clear test tables on connection
      if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
      expect_false(table_exists(conn, "test.SCDB_tmp1"))

      # Out of order application
      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_1
        )
      )

      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_3
        )
      )

      suppressMessages(
        delta_load(
          conn,
          db_table = "test.SCDB_tmp1",
          delta = delta_2
        )
      )

      # Check transfer success
      expect_identical(
        get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts),
        get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
          dplyr::collect() %>%
          dplyr::arrange(col1, col2, from_ts)
      )

      close_connection(source_conn)
    }
  })


  test_that("delta loading writes logs as expected", {

    # Clear and run all deltas
    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
    expect_false(table_exists(conn, "test.SCDB_tmp1"))

    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp2", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp2", conn))
    expect_false(table_exists(conn, "test.SCDB_tmp2"))

    if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn))
    expect_false(table_exists(conn, "test.SCDB_logs"))

    # Configure a logger for the normal update steps
    logger_1 <- Logger$new(
      db_table = "test.SCDB_tmp1",
      log_table_id = "test.SCDB_logs",
      log_conn = conn,
      output_to_console = FALSE
    )

    # Build a test state
    # Update 1
    update_snapshot(
      .data = t0,
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp = "2022-01-01 08:00:00",
      logger = logger_1
    )

    delta_1 <- delta_export(
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp_from  = "2022-01-01 08:00:00"
    )
    defer_db_cleanup(delta_1)

    # Update 2
    update_snapshot(
      .data = t1,
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp = "2022-01-01 08:10:00",
      logger = logger_1
    )

    delta_2 <- delta_export(
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp_from  = "2022-01-01 08:10:00"
    )
    defer_db_cleanup(delta_2)

    # Update 3
    update_snapshot(
      .data = t2,
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp = "2022-01-01 08:20:00",
      logger = logger_1
    )

    delta_3 <- delta_export(
      conn = conn,
      db_table = "test.SCDB_tmp1",
      timestamp_from  = "2022-01-01 08:20:00"
    )
    defer_db_cleanup(delta_3)


    # Configure a logger for the delta load
    logger_2 <- Logger$new(
      db_table = "test.SCDB_tmp2",
      log_table_id = "test.SCDB_logs",
      log_conn = conn
    )

    # Replay deltas
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_1, logger = logger_2)
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_2, logger = logger_2)
    delta_load(conn, db_table = "test.SCDB_tmp2", delta = delta_3, logger = logger_2)

    # Check transfer success
    expect_identical(
      get_table(conn, "test.SCDB_tmp2", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts),
      get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) %>%
        dplyr::collect() %>%
        dplyr::arrange(col1, col2, from_ts)
    )

    # Check logs match insertions and deactivations
    expect_identical(
      dplyr::tbl(conn, id("test.SCDB_logs", conn)) %>%
        tidyr::unite("db_table_name", dplyr::any_of(c("catalog", "schema", "table")), sep = ".") %>%
        dplyr::filter(.data$db_table_name == !!as.character(id("test.SCDB_tmp2", conn))) %>% # delta_load() logs
        dplyr::select("date", "n_insertions", "n_deactivations") %>%
        dplyr::collect() %>%
        dplyr::arrange(.data$date),
      dplyr::tbl(conn, id("test.SCDB_logs", conn)) %>%
        tidyr::unite("db_table_name", dplyr::any_of(c("catalog", "schema", "table")), sep = ".") %>%
        dplyr::filter(.data$db_table_name == !!as.character(id("test.SCDB_tmp1", conn))) %>% # update_snapshot() logs
        dplyr::select("date", "n_insertions", "n_deactivations") %>%
        dplyr::collect() %>%
        dplyr::arrange(.data$date)
    )
  })

  close_connection(conn)
}

Try the SCDB package in your browser

Any scripts or data that you put into this service are public.

SCDB documentation built on May 18, 2026, 9:06 a.m.