Nothing
#' Update a historical table
#'
#' @description
#' `update_snapshot()` makes it easy to create and update a historical data table on a remote (SQL) server.
#' The function takes the data (`.data`) as it looks on a given point in time (`timestamp`) and then updates
#' (or creates) an remote table identified by `db_table`.
#' This update only stores the changes between the new data (`.data`) and the data currently stored on the remote.
#' This way, the data can be reconstructed as it looked at any point in time while taking as little space as possible.
#'
#' See `vignette("basic-principles")` for further introduction to the function.
#'
#' @details
#' The most common use case is having consecutive snapshots of a dataset and wanting to store the changes between
#' them. If you have a special case where you want to insert data that is not consecutive, you can set the
#' `enforce_chronological_order` to `FALSE`. This will allow you to insert data that is earlier than the latest
#' time stamp.
#'
#' If you have more updates in a single day and use `Date()` rather than `POSIXct()`, as your time stamp, you
#' may end up with records where `from_ts` and `until_ts` are equal. These records not normally accessible with
#' `get_table()` and you may want to prevent these records using `collapse_continuous_records = TRUE`.
#'
#' @template .data
#' @template conn
#' @template db_table
#' @param timestamp (`POSIXct(1)`, `Date(1)`, or `character(1)`)\cr
#' The timestamp describing the data being processed (not the current time).
#' @template filters
#' @param message (`character(1)`)\cr
#' A message to add to the log-file (useful for supplying metadata to the log).
#' @param tic (`POSIXct(1)`)\cr
#' A timestamp when computation began. If not supplied, it will be created at call-time
#' (used to more accurately convey the runtime of the update process).
#' @param logger (`Logger(1)`)\cr
#' A configured logging object. If none is given, one is initialized with default arguments.
#' @param enforce_chronological_order (`logical(1)`)\cr
#' Are updates allowed if they are chronologically earlier than latest update?
#' @param collapse_continuous_records (`logical(1)`)\cr
#' Check for records where from/until time stamps are equal and delete?
#' Forced `TRUE` when `enforce_chronological_order` is `FALSE`.
#' @return
#' No return value, called for side effects.
#' @examplesIf requireNamespace("RSQLite", quietly = TRUE)
#' conn <- get_connection()
#'
#' data <- dplyr::copy_to(conn, mtcars)
#'
#' # Copy the first 3 records
#' update_snapshot(
#' head(data, 3),
#' conn = conn,
#' db_table = "test.mtcars",
#' timestamp = Sys.time()
#' )
#'
#' # Update with the first 5 records
#' update_snapshot(
#' head(data, 5),
#' conn = conn,
#' db_table = "test.mtcars",
#' timestamp = Sys.time()
#' )
#'
#' dplyr::tbl(conn, "test.mtcars")
#'
#' close_connection(conn)
#' @seealso filter_keys
#' @importFrom rlang .data
#' @export
update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, message = NULL, tic = Sys.time(),
logger = NULL,
enforce_chronological_order = TRUE,
collapse_continuous_records = FALSE) {
# Check arguments
checkmate::assert_class(.data, "tbl_dbi")
checkmate::assert_class(conn, "DBIConnection")
assert_dbtable_like(db_table)
assert_timestamp_like(timestamp)
checkmate::assert_class(filters, "tbl_dbi", null.ok = TRUE)
checkmate::assert_character(message, null.ok = TRUE)
assert_timestamp_like(tic)
checkmate::assert_multi_class(logger, "Logger", null.ok = TRUE)
checkmate::assert_logical(enforce_chronological_order)
checkmate::assert_logical(collapse_continuous_records)
### Create target table if not exists
# Retrieve Id from any valid db_table inputs to correctly create a missing table
db_table_id <- id(db_table, conn)
if (table_exists(conn, db_table_id)) {
# Obtain a lock on the table
if (!lock_table(conn, db_table_id, schema = get_schema(db_table_id))) {
stop("A lock could not be obtained on the table", call. = FALSE)
}
db_table <- dplyr::tbl(conn, db_table_id)
} else {
db_table <- create_table(dplyr::collect(utils::head(.data, 0)), conn, db_table_id, temporary = FALSE)
}
### Initialize logger
if (is.null(logger)) {
logger <- Logger$new(
db_table = db_table_id,
log_conn = conn,
timestamp = timestamp,
start_time = tic
)
}
logger$log_info("Started", tic = tic) # Use input time in log
# Add message to log (if given)
if (!is.null(message)) {
logger$log_to_db(message = message)
logger$log_info("Message:", message, tic = tic)
}
### Check incoming data
if (!is.historical(db_table)) {
# Release table lock
unlock_table(conn, db_table_id, get_schema(db_table_id))
logger$log_to_db(success = FALSE, end_time = !!db_timestamp(tic, conn))
logger$log_error("Table does not seem like a historical table", tic = tic) # Use input time in log
}
if (!setequal(colnames(.data),
colnames(dplyr::select(db_table, !c("checksum", "from_ts", "until_ts"))))) {
# Release table lock
unlock_table(conn, db_table_id, get_schema(db_table_id))
logger$log_to_db(success = FALSE, end_time = !!db_timestamp(tic, conn))
logger$log_error(
"Columns do not match!\n",
"Table columns:\n",
toString(colnames(dplyr::select(db_table, !tidyselect::any_of(c("checksum", "from_ts", "until_ts"))))),
"\nInput columns:\n",
toString(colnames(.data)),
tic = tic # Use input time in log
)
}
logger$log_info("Parsing data for table", as.character(db_table_id), "started", tic = tic) # Use input time in log
logger$log_info("Given timestamp for table is", timestamp, tic = tic) # Use input time in log
### Check for current update status
db_latest <- db_table %>%
dplyr::summarize(max(.data$from_ts, na.rm = TRUE)) %>%
dplyr::pull() %>%
as.character() %>%
dplyr::coalesce("1900-01-01 00:00:00")
# Convert timestamp to character to prevent inconsistent R behavior with date/timestamps
timestamp <- strftime(timestamp)
db_latest <- strftime(db_latest)
if (enforce_chronological_order && timestamp < db_latest) {
# Release the table lock
unlock_table(conn, db_table_id, get_schema(db_table_id))
logger$log_to_db(success = FALSE, end_time = !!db_timestamp(tic, conn))
logger$log_error("Given timestamp", timestamp, "is earlier than latest",
"timestamp in table:", db_latest, tic = tic) # Use input time in log
}
### Filter and compute checksums for incoming data
.data <- .data %>%
dplyr::ungroup() %>%
filter_keys(filters) %>%
dplyr::select(colnames(dplyr::select(db_table, !tidyselect::any_of(c("checksum", "from_ts", "until_ts")))))
# Copy to the target connection if needed
if (!identical(dbplyr::remote_con(.data), conn)) {
.data <- dplyr::copy_to(conn, .data, name = unique_table_name("SCDB_update_snapshot_input"))
defer_db_cleanup(.data)
}
# Once we ensure .data is on the same connection as the target, we compute the checksums
.data <- digest_to_checksum(.data, col = "checksum")
if (!inherits(conn, "SQLiteConnection")) .data <- dplyr::compute(.data) # SQLite was computed in digest_to_checksum
defer_db_cleanup(.data)
### Determine the next timestamp in the data (can be NA if none is found)
next_timestamp <- min(
db_table %>%
dplyr::filter(.data$from_ts > !!db_timestamp(timestamp, conn)) %>%
dplyr::summarize(next_timestamp = min(.data$from_ts, na.rm = TRUE)) %>%
dplyr::pull("next_timestamp"),
db_table %>%
dplyr::filter(.data$until_ts > !!db_timestamp(timestamp, conn)) %>%
dplyr::summarize(next_timestamp = min(.data$until_ts, na.rm = TRUE)) %>%
dplyr::pull("next_timestamp")
) %>%
as.POSIXct(origin = "1970-01-01") %>%
strftime()
### Consider only records valid at timestamp (and apply the filter if present)
db_table <- slice_time(db_table, db_timestamp(timestamp, conn))
# Apply filter to current records
if (!is.null(filters) && !identical(dbplyr::remote_con(filters), conn)) {
filters <- dplyr::copy_to(conn, filters, name = unique_table_name("SCDB_update_snapshot_filter"))
defer_db_cleanup(filters)
}
db_table <- filter_keys(db_table, filters)
# Determine records to remove and to add to the DB
# Records will be removed if their checksum no longer exists on the new date
# Records will be added if their checksum does not exists in the current data
# Generate SQL at lower level than tidyverse to get the affected rows without computing.
slice_ts <- db_timestamp(timestamp, conn)
currently_valid_checksums <- db_table %>%
dplyr::select("checksum")
## Deactivation
logger$log_info("Deactivating records")
checksums_to_deactivate <- dplyr::setdiff(currently_valid_checksums, dplyr::select(.data, "checksum"))
sql_deactivate <- dbplyr::sql_query_update_from(
con = conn,
table = dbplyr::as.sql(db_table_id, con = conn),
from = dbplyr::sql_render(checksums_to_deactivate),
by = "checksum",
update_values = c("until_ts" = slice_ts)
)
logger$log_info("After to_remove")
# Commit changes to DB
rs_deactivate <- DBI::dbSendQuery(conn, sql_deactivate)
n_deactivations <- DBI::dbGetRowsAffected(rs_deactivate)
DBI::dbClearResult(rs_deactivate)
logger$log_to_db(n_deactivations = !!n_deactivations)
logger$log_info("Deactivate records count:", n_deactivations)
## Insertion
logger$log_info("Adding new records")
records_to_insert <- dbplyr::build_sql(
con = conn,
"SELECT *, ", db_timestamp(timestamp, conn), " AS ", dbplyr::ident("from_ts"), ", ",
db_timestamp(next_timestamp, conn), " AS ", dbplyr::ident("until_ts"),
" FROM ", dbplyr::remote_table(.data),
" WHERE ", dbplyr::remote_table(.data), ".", dbplyr::ident("checksum"),
" NOT IN (", dbplyr::sql_render(currently_valid_checksums), ")"
)
sql_insert <- dbplyr::sql_query_insert(
con = conn,
table = dbplyr::as.sql(db_table_id, con = conn),
from = records_to_insert,
insert_cols = c(colnames(.data), "from_ts", "until_ts"),
by = c("checksum", "from_ts"),
conflict = "ignore"
)
logger$log_info("After to_add")
# Commit changes to DB
rs_insert <- DBI::dbSendQuery(conn, sql_insert)
n_insertions <- DBI::dbGetRowsAffected(rs_insert)
DBI::dbClearResult(rs_insert)
logger$log_to_db(n_insertions = !!n_insertions)
logger$log_info("Insert records count:", n_insertions)
# If chronological order is not enforced, some records may be split across several records
# checksum is the same, and from_ts / until_ts are continuous
# We collapse these records here
if (!enforce_chronological_order) {
# First we identify the records with this stitching
consecutive_rows <- dplyr::inner_join(
dplyr::tbl(conn, db_table_id),
dplyr::tbl(conn, db_table_id) %>% dplyr::select("checksum", "from_ts", "until_ts"),
suffix = c("", ".p"),
sql_on = paste(
'"RHS"."checksum" = "LHS"."checksum" AND ',
'("LHS"."until_ts" = "RHS"."from_ts" OR "LHS"."from_ts" = "RHS"."until_ts")'
)
)
# If the record has the earlier from_ts, we use the until_ts of the other.
# If the record has the later from_ts, we set until_ts equal to from_ts to trigger
# clean up later in update_snapshot.
consecutive_rows_fix <- consecutive_rows %>%
dplyr::mutate("until_ts" = ifelse(.data$from_ts < .data$from_ts.p, .data$until_ts.p, .data$from_ts)) %>%
dplyr::select(!tidyselect::ends_with(".p"))
if (inherits(conn, "duckdb_connection")) {
# For duckdb the lower level translation fails
# dbplyr 2.5.0, duckdb 0.10.2
consecutive_rows_fix <- dplyr::compute(consecutive_rows_fix)
defer_db_cleanup(consecutive_rows_fix)
n_consecutive <- nrow(consecutive_rows_fix) / 2
dplyr::rows_update(
x = dplyr::tbl(conn, db_table_id),
y = consecutive_rows_fix,
by = c("checksum", "from_ts"),
unmatched = "ignore",
in_place = TRUE
)
} else {
sql_fix_consecutive <- dbplyr::sql_query_upsert(
con = conn,
table = dbplyr::as.sql(db_table_id, con = conn),
from = dbplyr::sql_render(consecutive_rows_fix),
by = c("checksum", "from_ts"),
update_cols = "until_ts"
)
# Commit changes to DB
rs_fix_consecutive <- DBI::dbSendQuery(conn, sql_fix_consecutive)
n_consecutive <- DBI::dbGetRowsAffected(rs_fix_consecutive) / 2
DBI::dbClearResult(rs_fix_consecutive)
}
logger$log_info("Doubly updated records removed:", n_consecutive)
}
# If several updates come in a single day, some records may have from_ts = until_ts.
# Alternatively, the above handling of consecutive records will make records have from_ts = until_ts
# We remove these records here
if (collapse_continuous_records || !enforce_chronological_order) {
redundant_rows <- dplyr::tbl(conn, db_table_id) %>%
dplyr::filter(.data$from_ts == .data$until_ts) %>%
dplyr::select("checksum", "from_ts")
sql_fix_redundant <- dbplyr::sql_query_delete(
con = conn,
table = dbplyr::as.sql(db_table_id, con = conn),
from = dbplyr::sql_render(redundant_rows),
by = c("checksum", "from_ts")
)
# Commit changes to DB
rs_fix_redundant <- DBI::dbSendQuery(conn, sql_fix_redundant)
n_redundant <- DBI::dbGetRowsAffected(rs_fix_redundant) / 2
DBI::dbClearResult(rs_fix_redundant)
logger$log_info("Continuous records collapsed:", n_redundant)
}
# Clean up
toc <- Sys.time()
logger$finalize_db_entry()
logger$log_info("Finished processing for table", as.character(db_table_id), tic = toc)
# Release table lock
unlock_table(conn, db_table_id, get_schema(db_table_id))
return(NULL)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.