R/insert_into_returning.R

Defines functions convert_bit64_columns postgres_insert_empty_row postgres_insert insert_into_returning.PqConnection sqlite_execute_query sqlite_insert_empty_row sqlite_insert_row insert_into_returning.SQLiteConnection insert_into_returning

Documented in convert_bit64_columns insert_into_returning postgres_insert_empty_row sqlite_execute_query sqlite_insert_empty_row

#' Insert values into table returning the primary key
#'
#' Checks whether the table already has values matching
#'
#' @param con Connection to db to insert values into.
#' @param table Name of table to insert values into.
#' @param d Data frame of data to be inserted into db.
#' @param key Keys to check for already existing in the database. If trying to
#' add a new row with unique key constraints use this to check for existing
#' entries with same information.
#' @param ret The fields to return, defaults to using "id"
#'
#' @return List of returned ids from inserted values.
#'
#' @keywords internal
insert_into_returning <- function(con, table, d, key = NULL, ret = NULL) {
  UseMethod("insert_into_returning")
}

insert_into_returning.SQLiteConnection <- function(con, table, d, key = NULL,
                                                   ret = NULL) {
  ret <- ret %||% (if (length(key) >= 1L) key else "id")
  insert1 <- function(i) {
    ## Insert one row at a time and strip any NA columns before insert. NA
    ## columns will instead use default null representation for that column
    ## in the database, or use an autogenerated value.
    x <- d[i, , drop = FALSE]
    x <- x[!vlapply(x, is.na)]
    ## We may be asked to insert an 'empty' row - this would be a row where
    ## we want values of autogenerated rows but don't necessarily have other
    ## columns.
    if (ncol(x) > 0) {
      result <- sqlite_insert_row(con, table, x, key, ret)
    } else {
      result <- sqlite_insert_empty_row(con, table, ret)
    }
    result
  }

  rows <- lapply(seq_len(nrow(d)), insert1)
  do.call(rbind.data.frame, rows)
}

sqlite_insert_row <- function(con, table, x, key, ret) {
  sql <- c(sprintf("INSERT INTO %s", table),
           sprintf("  (%s)", paste(names(x), collapse = ", ")),
           "VALUES",
           sprintf("  (%s)", paste0("$", seq_along(x), collapse = ", "))
  )
  sql <- paste(sql, collapse = "\n")
  ## If we don't need to check any columns or if the columns we've been told to
  ## check do not exist within the data we're importing then go ahead and add
  ## the new data.
  if (is.null(key) || !(any(key %in% names(x)))) {
    result <- sqlite_execute_query(con, sql, table, ret, x)
  } else {
    ## Try and retrieve first:
    sql_get <- c(
      sprintf("SELECT %s FROM %s WHERE", paste(ret, collapse = ", "), table),
      paste(sprintf("%s = $%d", key, seq_along(key)), collapse = " AND "))
    result <- DBI::dbGetQuery(con, paste(sql_get, collapse = "\n"),
                              unname(x[key]))
    if (nrow(result) == 0L) {
      result <- sqlite_execute_query(con, sql, table, ret, x)
    }
  }
  result
}

#' Insert an empty row and returned desired columns.
#'
#' @param con Connection to the DB.
#' @param table The table to add row to.
#' @param ret Vector of column names to be returned.
#'
#' @return Data frame with the newly inserted values for the specified column.
#'
#' @keywords internal
sqlite_insert_empty_row <- function(con, table, ret) {
  sql <- sprintf("INSERT INTO %s DEFAULT VALUES", table)
  sqlite_execute_query(con, sql, table, ret)
}

#' Execute sql insert query returning added columns desired columns.
#'
#' @param con Connection to the DB.
#' @param sql The sql insert query.
#' @param table The table to insert data into.
#' @param x Params for the sql query.
#' @param ret Vector of column names to be returned.
#'
#' @return Data frame with the newly inserted values for the specified column.
#'
#' @keywords internal
sqlite_execute_query <- function(con, sql, table, ret, x = NULL) {
  ## TODO: What happens here if we insert but the table has no rowid?
  ## see https://www.sqlite.org/withoutrowid.html for ref
  if (is.null(x)) {
    DBI::dbExecute(con, sql)
  } else {
    DBI::dbExecute(con, sql, unname(x))
  }
  ## Insert a new row of data, find the ID of the last inserted row then use
  ## this to locate the new entry to the database and pull out the requested
  ## columns.
  rowid_last_entry <- DBI::dbGetQuery(con, "SELECT last_insert_rowid()")[1, 1]
  get_columns_sql <- sprintf("SELECT %s FROM %s WHERE rowid = $1",
                             paste(ret, collapse = ", "),
                             table)
  DBI::dbGetQuery(con, get_columns_sql, rowid_last_entry)
}

insert_into_returning.PqConnection <- function(con, table, d, key = NULL,
                                               ret = NULL) {
  ret <- ret %||% (if (length(key) >= 1L) key else "id")
  insert1 <- function(i) {
    ## Insert one row at a time and strip any NA columns before insert. NA
    ## columns will instead use default null representation for that column
    ## in the database, or use an autogenerated value.
    x <- d[i, , drop = FALSE]
    x <- x[!vlapply(x, is.na)]
    ## We may be asked to insert an 'empty' row - this would be a row where
    ## we want values of autogenerated rows but don't necessarily have other
    ## columns.
    if (ncol(x) > 0) {
      result <- postgres_insert(con, table, x, key, ret)
    } else {
      result <- postgres_insert_empty_row(con, table, ret)
    }
    ## Postgres returns bit64 ints by default, convert to int so returned data
    ## consistent between postgres and sqlite.
    result <- convert_bit64_columns(result)
  }

  rows <- lapply(seq_len(nrow(d)), insert1)
  do.call(rbind.data.frame, rows)
}

postgres_insert <- function(con, table, x, key, ret) {
  sql <- c(sprintf("INSERT INTO %s", table),
           sprintf("  (%s)", paste(names(x), collapse = ", ")),
           "VALUES",
           sprintf("  (%s)", paste0("$", seq_along(x), collapse = ", ")),
           sprintf("RETURNING %s", paste(ret, collapse = ", ")))
  sql <- paste(sql, collapse = "\n")

  ## If we don't need to check any columns or if the columns we've been told to
  ## check do not exist within the data we're importing then go ahead and add
  ## the new data.
  if (is.null(key) || !(any(key %in% names(x)))) {
    result <- DBI::dbGetQuery(con, sql, unname(x))
  } else {
    ## Try and retrieve first:
    sql_get <- c(
      sprintf("SELECT %s FROM %s WHERE", paste(ret, collapse = ", "), table),
      paste(sprintf("%s = $%d", key, seq_along(key)), collapse = " AND "))
    result <- DBI::dbGetQuery(con, paste(sql_get, collapse = "\n"),
                              unname(x[key]))
    if (nrow(result) == 0L) {
      result <-  DBI::dbGetQuery(con, sql, unname(x))
    }
  }
  result
}


#' Insert an empty row and returned desired columns.
#'
#' @param con Connection to the DB.
#' @param table The table to add row to.
#' @param ret Vector of column names to be returned.
#'
#' @return Data frame with the newly inserted values for the specified column.
#'
#' @keywords internal
postgres_insert_empty_row <- function(con, table, ret) {
  ## In this case we want to insert row and only populate autoincrement rows
  ## Insert an empty row
  sql <- sprintf("INSERT INTO %s DEFAULT VALUES RETURNING %s",
                 table,
                 paste(ret, collapse = ", "))
  DBI::dbGetQuery(con, sql)
}


#' Convert any bit64 type columns to integers.
#'
#' Postgres returns bit64 types for integer fields.
#'
#' @param data The data frame with columns to be converted.
#'
#' @return Data frame with with any bit64 columns converted to integers.
#'
#' @keywords internal
convert_bit64_columns <- function(data) {
  convert <- function(col) {
    if (bit64::is.integer64(col)) {
      as.integer(col)
    } else {
      col
    }
  }
  data[] <- lapply(data, convert)
  data
}
vimc/dettl documentation built on Oct. 6, 2022, 2:13 p.m.