R/automatic_load.R

Defines functions map_values update_child_tables strip_referenced_key_columns get_old_key_values dettl_auto_load

Documented in dettl_auto_load update_child_tables

#' Automatic load function for append mode imports.
#'
#' The automatic load function loops over the transformed data and appends each
#' data frame to the matching table in the database. If the appended table
#' contains a key referenced by one of the foreign key constraints then when
#' the data is inserted into the database this returns the value of the key for
#' the new rows. Then loop over all tables in which this is used as a foreign
#' key and update the previous values to use the returned actual values for the
#' referenced key.
#'
#' Expect that this should only be called from within a custom load function if
#' we want to load data to the database in the automatic way but have some
#' special edge cases which we need to add some custom handling for before
#' or after running the automatic load.
#'
#' @param transformed_data The list of transformed data frames to append
#' to tables in the database.
#' @param con Connection to the database to add data to.
#'
#' @export
#' @examples
#' path <- dettl:::prepare_test_import(
#'   system.file("examples", "person_information", package = "dettl"),
#'   system.file("examples", "dettl_config.yml", package = "dettl")
#' )
#' import <- dettl::dettl(file.path(path, "person_information"), "test")
#' con <- import$get_connection()
#' data <- list("people" = data.frame(
#'   name = c("Alice", "Bob"),
#'   age = c(25, 43),
#'   height = c(175, 187),
#'   stringsAsFactors = FALSE))
#' dettl_auto_load(data, con)
dettl_auto_load <- function(transformed_data, con) {
  rewrite_keys <- ForeignKeyConstraints$new(con)
  ordered_data <- rewrite_keys$get_upload_order(transformed_data)
  for (name in ordered_data) {
    message(sprintf("Updating %s (adding %d rows)",
                    name, nrow(transformed_data[[name]])))
    has_serial_foreign_keys <- rewrite_keys$used_as_foreign_key(name) &&
                               rewrite_keys$has_serial(name)
    if (has_serial_foreign_keys) {
      referenced_keys <- rewrite_keys$get_referenced_keys(name)
      ## We only want to strip and use DB generated values for serial keys
      ## as these will be autogenerated on upload
      serials <- rewrite_keys$is_serial(name, referenced_keys)
      serial_keys <- referenced_keys[serials]
      insert_data <- strip_referenced_key_columns(transformed_data[[name]],
                                                  serial_keys)
      ret <- insert_into_returning(con, name, insert_data,
                                   ret = serial_keys)
      for (col in names(ret)) {
        ## Update the tables where this referenced key is used
        if (rewrite_keys$is_serial_constraint(name, col, transformed_data)) {
          table_key_pair <- rewrite_keys$get_foreign_key_usages(name, col)
          old_key_values <- get_old_key_values(transformed_data, name, col)
          transformed_data <- update_child_tables(
            transformed_data, table_key_pair, old_key_values, ret[[col]], name)
        }
      }
    } else {
      withCallingHandlers({
        ## DBI::dbWriteTable does not work with table name like
        ## schema_name.table_name. We need to create a DBI Id object
        ## to write to a table in a specific schema.
        split_name <- strsplit(name, ".", fixed = TRUE)[[1]]
        if (length(split_name) == 1) {
          table_id <- DBI::Id(
            table = split_name
          )
        } else {
          table_id <- DBI::Id(
            schema = split_name[1],
            table = split_name[2]
          )
        }
        DBI::dbWriteTable(con, table_id, transformed_data[[name]],
                          append = TRUE)
      },
      error = function(e) {
        stop(data_write_error(e$message, name, transformed_data[[name]]))
      })
    }
  }
  invisible(TRUE)
}

get_old_key_values <- function(data, table_name, col_name) {
  if (!(col_name %in% colnames(data[[table_name]]))) {
    stop(sprintf(paste0(
      "Can't upload data, referenced key '%s' of table '%s' is missing but ",
      "is referenced by foreign key constraint used in data."),
      col_name, table_name))
  }
  data[[table_name]][, col_name]
}

strip_referenced_key_columns <- function(data, referenced_keys) {
  columns <- which(!(names(data) %in% referenced_keys))
  data[, columns, drop = FALSE]
}

#' Update child tables using inserted foreign keys
#'
#' @param tables List of tables to be updated.
#' @param table_key_pair List of child tables and the foreign key field.
#' @param old_key_values Old values of referenced key, how foreign key is
#' currently identified in child table.
#' @param new_key_values New values of referenced key, what foreign key should
#' be updated to.
#' @param parent_table The name of the table just updated
#'
#' @return The updated tables
#'
#' @keywords internal
update_child_tables <- function(tables, table_key_pair, old_key_values,
                                new_key_values, parent_table) {
  update1 <- function(table_name) {
    table <- tables[[table_name]]
    ## TODO: We remove the table we have just updated from the list of child
    ## tables. This means that any if we have a table where one of the columns
    ## has a foreign key constraint on another column within the same table
    ## (e.g. a hierachy where we have a parent column which refers to the PK of
    ## the table itself) then that column will not be updated.
    ## We can't currently work around this as we add all new rows to the table
    ## in one go, to do this would require uploading row by row then checking
    ## for updated autogenerated rows within the same table.
    if (!is.null(table_key_pair[[table_name]]) && table_name != parent_table) {
      foreign_key <- table_key_pair[[table_name]]
      if (!is.null(table[[foreign_key]])) {
        table[[foreign_key]] <- unlist(
          map_values(table[[foreign_key]], old_key_values, new_key_values,
                     table_name, foreign_key),
          FALSE, FALSE)
      }
    }
    table
  }
  list_names <- names(tables)
  tables <- lapply(list_names, update1)
  names(tables) <- list_names
  tables
}

## Update any old values within data to new ones
## Values must be unique in old and new
map_values <- function(data, old, new, table_name, col_name) {
  if (!(length(unique(old)) == length(old) &&
      length(unique(new)) == length(new) &&
      length(new) == length(old))) {
    stop(sprintf(paste0("Cannot map values for table '%s', column '%s'.
                         Check if referenced keys are unique."),
                 table_name, col_name))
  }
  indices <- vnapply(seq_along(data), function(x) {
    value <- data[x]
    index <- which(old == value)
    if (length(index) == 0) {
      index <- NA_integer_
    }
    index
  })
  mapped_values <- new[indices]
  ## If value has not been mapped then keep original
  na_values <- which(is.na(mapped_values))
  mapped_values[na_values] <- data[na_values]
  mapped_values
}
vimc/dettl documentation built on Oct. 6, 2022, 2:13 p.m.