tests/testthat/test-update_snapshot.R

test_that("update_snapshot() works", {
  for (conn in get_test_conns()) {

    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
    if (DBI::dbExistsTable(conn, id("test.SCDB_logs", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_logs", conn))

    target <- mtcars |>
      dplyr::copy_to(conn, df = _, name = unique_table_name()) |>
      digest_to_checksum(col = "checksum") |>
      dplyr::mutate(from_ts  = !!db_timestamp("2022-10-01 09:00:00", conn),
                    until_ts = !!db_timestamp(NA, conn))

    # Copy target to conn
    target <- dplyr::copy_to(conn, target, name = id("test.SCDB_tmp1", conn), overwrite = TRUE, temporary = FALSE)

    .data <- mtcars |>
      dplyr::mutate(hp = dplyr::if_else(hp > 130, hp - 10, hp)) |>
      dplyr::copy_to(conn, df = _, name = unique_table_name())

    # This is a simple update where 23 rows are replaced with 23 new ones on the given date
    db_table <- "test.SCDB_tmp1"
    timestamp <- "2022-10-03 09:00:00"
    log_path <- tempdir()

    logger <- Logger$new(
      db_table = db_table,
      timestamp = timestamp,
      log_path = log_path,
      log_table_id = "test.SCDB_logs",
      log_conn = conn,
      output_to_console = FALSE
    )

    dir(log_path) |>
      purrr::keep(~ endsWith(., ".log")) |>
      purrr::walk(~ unlink(file.path(log_path, .)))

    update_snapshot(.data, conn, db_table, timestamp, logger = logger)

    expect_identical(slice_time(target, "2022-10-01 09:00:00") |>
                       dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
                       dplyr::collect() |>
                       dplyr::arrange(wt, qsec),
                     mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble())
    expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")),
                 nrow(mtcars))

    expect_identical(slice_time(target, "2022-10-03 09:00:00") |>
                       dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
                       dplyr::collect() |>
                       dplyr::arrange(wt, qsec),
                     .data |> dplyr::collect() |> dplyr::arrange(wt, qsec))
    expect_equal(nrow(slice_time(target, "2022-10-03 09:00:00")),
                 nrow(mtcars))

    # Check file log outputs exists
    log_pattern <- glue::glue("{stringr::str_replace_all(as.Date(timestamp), '-', '_')}.{id(db_table, conn)}.log")
    log_file <- purrr::keep(dir(log_path), ~stringr::str_detect(., log_pattern))
    expect_length(log_file, 1)
    expect_gt(file.info(file.path(log_path, log_file))$size, 0)
    expect_identical(nrow(get_table(conn, "test.SCDB_logs")), 1)

    db_logs_with_log_file <- get_table(conn, "test.SCDB_logs") |>
      dplyr::filter(!is.na(.data$log_file))
    expect_identical(nrow(db_logs_with_log_file), 1)

    # Check database log output
    logs <- get_table(conn, "test.SCDB_logs") |> dplyr::collect()

    # The logs should have specified data types
    types <- c(
      "date" = "POSIXct",
      "catalog" = "character",
      "schema" = "character",
      "table" = "character",
      "n_insertions" = "numeric",
      "n_deactivations" = "numeric",
      "start_time" = "POSIXct",
      "end_time" = "POSIXct",
      "duration" = "character",
      "success" = "logical",
      "message" = "character"
    )

    if (inherits(conn, "SQLiteConnection")) {
      types <- types |>
        purrr::map_if(~ identical(., "POSIXct"), "character") |> # SQLite does not support POSIXct
        purrr::map_if(~ identical(., "logical"), "numneric") |>  # SQLite does not support logical
        as.character()
    }

    checkmate::expect_data_frame(logs, nrows = 1, types)


    # We now attempt to do another update on the same date
    .data <- mtcars |>
      dplyr::mutate(hp = dplyr::if_else(hp > 100, hp - 10, hp)) |>
      dplyr::copy_to(conn, df = _, name = unique_table_name())

    update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-03 09:00:00", logger = logger)

    # Even though we insert twice on the same date, we expect the data to be minimal (compacted)
    expect_identical(slice_time(target, "2022-10-01 09:00:00") |>
                       dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
                       dplyr::collect() |>
                       dplyr::arrange(wt, qsec),
                     mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble())
    expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")),
                 nrow(mtcars))

    expect_identical(slice_time(target, "2022-10-03 09:00:00") |>
                       dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
                       dplyr::collect() |>
                       dplyr::arrange(wt, qsec),
                     .data |> dplyr::collect() |> dplyr::arrange(wt, qsec))
    expect_equal(nrow(slice_time(target, "2022-10-03 09:00:00")),
                 nrow(mtcars))


    # We now attempt to an update between these two updates
    .data <- mtcars |>
      dplyr::mutate(hp = dplyr::if_else(hp > 150, hp - 10, hp)) |>
      dplyr::copy_to(conn, df = _, name = unique_table_name())

    # This should fail if we do not specify "enforce_chronological_order = FALSE"
    expect_error(
      update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00", logger = logger),
      regexp = "Given timestamp 2022-10-02 09:00:00 is earlier"
    )

    # But not with the variable set
    update_snapshot(.data, conn, "test.SCDB_tmp1", "2022-10-02 09:00:00",
                    logger = logger, enforce_chronological_order = FALSE)

    expect_identical(slice_time(target, "2022-10-01 09:00:00") |>
                       dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
                       dplyr::collect() |>
                       dplyr::arrange(wt, qsec),
                     mtcars |> dplyr::arrange(wt, qsec) |> tibble::tibble())
    expect_equal(nrow(slice_time(target, "2022-10-01 09:00:00")),
                 nrow(mtcars))

    expect_identical(slice_time(target, "2022-10-02 09:00:00") |>
                       dplyr::select(!c("from_ts", "until_ts", "checksum")) |>
                       dplyr::collect() |>
                       dplyr::arrange(wt, qsec),
                     .data |> dplyr::collect() |> dplyr::arrange(wt, qsec))
    expect_equal(nrow(slice_time(target, "2022-10-02 09:00:00")),
                 nrow(mtcars))


    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))
    expect_false(table_exists(conn, "test.SCDB_tmp1"))

    # Check deletion of redundant rows
    t0 <- data.frame(col1 = c("A", "B"),      col2 = c(NA_real_, NA_real_))
    t1 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1,        NA_real_, NA_real_))
    t2 <- data.frame(col1 = c("A", "B", "C"), col2 = c(1,        2,        3))

    # Copy t0, t1, and t2 to conn
    t0 <- dplyr::copy_to(conn, t0, name = id("test.SCDB_t0", conn), overwrite = TRUE, temporary = FALSE)
    t1 <- dplyr::copy_to(conn, t1, name = id("test.SCDB_t1", conn), overwrite = TRUE, temporary = FALSE)
    t2 <- dplyr::copy_to(conn, t2, name = id("test.SCDB_t2", conn), overwrite = TRUE, temporary = FALSE)


    update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01", logger = logger)
    expect_identical(dplyr::collect(t0) |> dplyr::arrange(col1),
                     dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))

    update_snapshot(t1, conn, "test.SCDB_tmp1", "2022-02-01", logger = logger)
    expect_identical(dplyr::collect(t1) |> dplyr::arrange(col1),
                     dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))

    update_snapshot(t2, conn, "test.SCDB_tmp1", "2022-02-01", logger = logger)
    expect_identical(dplyr::collect(t2) |> dplyr::arrange(col1),
                     dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))

    t <- list(t0, t1, t2) |>
      purrr::reduce(dplyr::union) |>
      dplyr::collect() |>
      dplyr::mutate(col2 = as.character(col2)) |>
      dplyr::arrange(col1, col2) |>
      utils::head(5)

    t_ref <- get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) |>
      dplyr::select(!any_of(c("from_ts", "until_ts", "checksum"))) |>
      dplyr::collect() |>
      dplyr::mutate(col2 = as.character(col2)) |>
      dplyr::arrange(col1, col2)

    expect_identical(t, t_ref)

    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))



    # Check non-chronological insertion
    update_snapshot(t0, conn, "test.SCDB_tmp1", "2022-01-01", logger = logger)
    expect_identical(dplyr::collect(t0) |> dplyr::arrange(col1),
                     dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))

    update_snapshot(t2, conn, "test.SCDB_tmp1", "2022-03-01", logger = logger)
    expect_identical(dplyr::collect(t2) |> dplyr::arrange(col1),
                     dplyr::collect(get_table(conn, "test.SCDB_tmp1")) |> dplyr::arrange(col1))

    update_snapshot(t1, conn, "test.SCDB_tmp1", "2022-02-01", logger = logger, enforce_chronological_order = FALSE)
    expect_identical(dplyr::collect(t1) |> dplyr::arrange(col1),
                     dplyr::collect(get_table(conn, "test.SCDB_tmp1", slice_ts = "2022-02-01")) |> dplyr::arrange(col1))

    t_ref <-
      tibble::tibble(col1     = c("A",          "B",          "A",          "C",          "B",          "C"),
                     col2     = c(NA_real_,     NA_real_,     1,            NA_real_,     2,            3),
                     from_ts  = c("2022-01-01", "2022-01-01", "2022-02-01", "2022-02-01", "2022-03-01", "2022-03-01"),
                     until_ts = c("2022-02-01", "2022-03-01", NA,           "2022-03-01", NA,           NA))

    expect_identical(get_table(conn, "test.SCDB_tmp1", slice_ts = NULL) |>
                       dplyr::select(!"checksum") |>
                       dplyr::collect() |>
                       dplyr::mutate(from_ts  = strftime(from_ts),
                                     until_ts = strftime(until_ts)) |>
                       dplyr::arrange(col1, from_ts),
                     t_ref |>
                       dplyr::arrange(col1, from_ts))

    if (file.exists(logger$log_realpath)) file.remove(logger$log_realpath)

    if (DBI::dbExistsTable(conn, id("test.SCDB_tmp1", conn))) DBI::dbRemoveTable(conn, id("test.SCDB_tmp1", conn))

    connection_clean_up(conn)
  }
})


test_that("update_snapshot works with Id objects", {
  withr::local_options("SCDB.log_path" = NULL) # No file logging

  for (conn in get_test_conns()) {

    target_table <- id("test.mtcars_modified", conn)

    logger <- Logger$new(output_to_console = FALSE,
                         timestamp = Sys.time(),
                         db_table = "test.mtcars_modified",
                         log_conn = NULL,
                         log_table_id = NULL,
                         warn = FALSE)

    expect_no_error(
      mtcars |>
        dplyr::mutate(disp = sample(mtcars$disp, nrow(mtcars))) |>
        dplyr::copy_to(dest = conn,
                       df = _,
                       name = unique_table_name()) |>
        update_snapshot(
          conn = conn,
          db_table = target_table,
          logger = logger,
          timestamp = format(Sys.time())
        )
    )

    connection_clean_up(conn)
  }
})


test_that("update_snapshot checks table formats", {

  withr::local_options("SCDB.log_path" = tempdir())

  for (conn in get_test_conns()) {

    mtcars_table <- dplyr::tbl(conn, id("__mtcars_historical", conn = conn))
    timestamp <- Sys.time()

    expect_warning(
      logger <- Logger$new(log_path = NULL, log_table_id = NULL, output_to_console = FALSE),                            # nolint: implicit_assignment_linter
      "NO file or database logging will be done."
    )

    # Test columns not matching
    broken_table <- dplyr::copy_to(conn, dplyr::select(mtcars, !"mpg"), name = "mtcars_broken", overwrite = TRUE)

    expect_error(
      update_snapshot(
        .data = broken_table,
        conn = conn,
        db_table = mtcars_table,
        timestamp = timestamp,
        logger = logger
      ),
      "Columns do not match!"
    )

    file.remove(list.files(getOption("SCDB.log_path"), pattern = format(timestamp, "^%Y%m%d.%H%M"), full.names = TRUE))

    # Test target table not being a historical table
    expect_error(
      update_snapshot(
        dplyr::tbl(conn, id("__mtcars", conn = conn)),
        conn,
        id("__mtcars", conn = conn),
        timestamp = timestamp,
        logger = logger
      ),
      "Table does not seem like a historical table"
    )

    connection_clean_up(conn)
  }
})


test_that("update_snapshot works with across connection", {
  skip_if_not_installed("RSQLite")

  withr::local_options("SCDB.log_path" = NULL) # No file logging

  # Test a data transfer from a local SQLite to the test connection
  source_conn <- DBI::dbConnect(RSQLite::SQLite())

  # Create a table for the tests
  mtcars_modified <- mtcars |>
    dplyr::mutate(name = rownames(mtcars))

  # Copy table to the source
  .data <- dplyr::copy_to(dest = source_conn, df = mtcars_modified, name = unique_table_name())

  # For each conn, we test if update_snapshot preserves data types
  for (target_conn in get_test_conns()) {

    target_table <- id("test.mtcars_modified", target_conn)
    if (DBI::dbExistsTable(target_conn, target_table)) DBI::dbRemoveTable(target_conn, target_table)

    logger <- LoggerNull$new()

    # Check we can transfer without error
    expect_no_error(
      update_snapshot(
        .data,
        conn = target_conn,
        db_table = target_table,
        logger = logger,
        timestamp = format(Sys.time())
      )
    )

    # Check that if we collect the table, the signature will match the original
    table_signature <- get_table(target_conn, target_table) |>
      dplyr::collect() |>
      dplyr::summarise(dplyr::across(tidyselect::everything(), ~ class(.)[1])) |>
      as.data.frame()

    expect_identical(
      table_signature,
      dplyr::summarise(mtcars_modified, dplyr::across(tidyselect::everything(), ~ class(.)[1]))
    )


    DBI::dbRemoveTable(target_conn, target_table)
    connection_clean_up(target_conn)
    rm(logger)
    invisible(gc())
  }
  connection_clean_up(source_conn)


  ## Now we test the reverse transfer

  # Test a data transfer from the test connection to a local SQLite
  target_conn <- DBI::dbConnect(RSQLite::SQLite())

  # For each conn, we test if update_snapshot preserves data types
  for (source_conn in get_test_conns()) {

    .data <- dplyr::copy_to(dest = source_conn, df = mtcars_modified, name = unique_table_name())

    target_table <- id("mtcars_modified", target_conn)
    if (DBI::dbExistsTable(target_conn, target_table)) DBI::dbRemoveTable(target_conn, target_table)

    logger <- LoggerNull$new()

    # Check we can transfer without error
    expect_no_error(
      update_snapshot(
        .data,
        conn = target_conn,
        db_table = target_table,
        logger = logger,
        timestamp = format(Sys.time())
      )
    )

    # Check that if we collect the table, the signature will match the original
    table_signature <- get_table(target_conn, target_table) |>
      dplyr::collect() |>
      dplyr::summarise(dplyr::across(tidyselect::everything(), ~ class(.)[1])) |>
      as.data.frame()

    expect_identical(
      table_signature,
      dplyr::summarise(mtcars_modified, dplyr::across(tidyselect::everything(), ~ class(.)[1]))
    )


    connection_clean_up(source_conn)
    rm(logger)
    invisible(gc())
  }
  connection_clean_up(target_conn)
})

Try the SCDB package in your browser

Any scripts or data that you put into this service are public.

SCDB documentation built on Oct. 4, 2024, 1:09 a.m.