Nothing
# This file is part of 'writer' R package
#' @name write_table
#' @title Write from multiple sources to a database table
#' @description Provides unified syntax to write data from [dplyr::tbl()] (lazy
#' dplyr query via a DBI connection) or a [dplyr::sql()] or a [data.frame] to
#' a database table with modes such as: `create`, `append`, `insert`,
#' `update`, `upsert`, `patch`, `delete`, `overwrite`, `overwrite_schema`.
#'
#' @param x ( [dplyr::tbl()] or [dplyr::sql()] or [data.frame] ) The data to
#' write to the database. Input need not have any rows.
#' @param table_name ( `string` or `AsIs` or `Id` ) The name of the table to
#' write to. This has to be one among: string (typically
#' `"catalog.schema.table"`), Object of class "AsIs" generated by wrapping
#' `I()` (typically, `I("catalog.schema.table")`) or Object of class `Id`
#' generated by `DBI::Id` (typically, `DBI::Id("catalog", "schema",
#' "table")`).
#' @param mode ( `string` ) Writing mode. Possible values are one among:
#' `create`, `append`, `insert`, `update`, `upsert`, `patch`, `delete`,
#' `overwrite`, `overwrite_schema`.
#' @param con ( default: NULL ) DBI-connection object to use to write operation.
#' When con is `NULL` and source is a `tbl_lazy`, then DBI connection object
#' from the source tbl is used. When source is a `data.frame`, then `con`
#' should be provided.
#' @param verbose ( default: TRUE ) Whether the progress message should be shown
#' @param ... Arguments passed to specific function based on `mode`. See
#' details.
#' @returns When successful, returns the output table name as a string. Else,
#' throws an error with informative messages.
#'
#' @details The `DBI-dplyr-dbplyr` combination provides a great workflow to
#' handle database operations from R. When saving the output from analysis
#' notebooks or scripts, different functions need to be called based on the
#' type of the object we intend to write. `writer` package solves the problem
#' by exporting one generic `write_table` to handle multiple input types and
#' multiple modes. Further, `overwrite` and `overwrite_schema` refine the idea
#' of table overwrite so that schema of the table is not changed
#' inadvertently.
#'
#' ## Modes
#'
#' * `create`: Creates a new table only if table with same name does not exist and writes data.
#' * `append`: Appends data only if table exists and schema matches. See [dplyr::rows_append()].
#' * `insert`: Inserts data only if table exists and key values do not exist. See [dplyr::rows_insert()].
#' * `update`: Updates data only if table exists and key values match. See [dplyr::rows_update()] .
#' * `upsert`: Inserts or Updates only if table exists and depending on whether or not the key value already exists. See [dplyr::rows_upsert()].
#' * `patch`: Updates only missing values if table exists and key values match. See [dplyr::rows_patch()].
#' * `delete`: Deletes rows for matching key values if table exists. See [dplyr::rows_delete()]
#' * `overwrite`: Overwrites data only if table exists and schema matches.
#' * `overwrite_schema`: Creates a new table with data irrespective of whether table exists or not.
#'
#' ## Failures
#'
#' The failures are mostly due to unavailable or wrong sql translation to the
#' specific backend. Please raise issues in
#' [dbplyr](https://github.com/tidyverse/dbplyr/issues) repo.
#'
#' Errors are raised with a
#' [class](https://rlang.r-lib.org/reference/abort.html) (using
#' [rlang::abort()]). These are the error classes:
#'
#' * `error_input`: Related to wrong or unexpected input.
#' * `error_connection`: Raised when connection is inactive.
#' * `error_table`: Related to table existence or non-existence.
#' * `error_operation`: Related to core write operation.
#'
#' ## Permissions
#'
#' * `create`: create new table
#' * `append`, `insert`, `update`, `upsert`, `patch`, `delete`: modify existing table
#' * `overwrite`: modify existing table
#' * `overwrite_schema`: delete, create and rename table
#'
#' @examples
#' \dontrun{
#' #' Create an in-memory SQLite database connection
#' con = DBI::dbConnect(RSQLite::SQLite(), ":memory:")
#'
#' remove_new_table = function(){
#' df = data.frame(id = 1:3, name = c("Alice", "Bob", "Charlie"))
#' DBI::dbRemoveTable(con, "new_table", fail_if_missing = FALSE)
#' }
#'
#' create_new_table = function(){
#' df = data.frame(id = 1:3, name = c("Alice", "Bob", "Charlie"))
#' DBI::dbWriteTable(con, "new_table", df, overwrite = TRUE)
#' }
#'
#' #' Create a sample data.frame
#' df = data.frame(id = 1:3, name = c("Alice", "Bob", "Charlie"))
#' df
#'
#' #' Create a new table
#' write_table(df, "new_table", mode = "create", con = con)
#' dplyr::tbl(con, "new_table")
#'
#'
#' intermediate = dplyr::tbl(con, "new_table") |>
#' dplyr::filter(id >= 2)
#'
#' write_table(intermediate, "new_filtered_table", mode = "create")
#' dplyr::tbl(con, "new_filtered_table")
#'
#' #' Append data to an existing table
#' create_new_table()
#' append_df = data.frame(id = 4:5, name = c("Dave", "Eve"))
#'
#' append_df |>
#' write_table("new_table", mode = "append", con = con)
#' dplyr::tbl(con, "new_table")
#'
#' create_new_table()
#' write_table(append_df, "append_table", mode = "create", con)
#' dplyr::tbl(con, "append_table")
#'
#' dplyr::tbl(con, "append_table") |>
#' write_table("new_table", mode = "append")
#' dplyr::tbl(con, "new_table")
#'
#' #' Insert data into an existing table, only if key values do not exist
#' create_new_table()
#' dplyr::tbl(con, "new_table")
#' insert_df = data.frame(id = 3:4, name = c("Dave", "Eve"))
#' insert_df
#'
#' insert_df |>
#' write_table("new_table",
#' mode = "insert",
#' con = con,
#' by = "id",
#' conflict = "ignore"
#' )
#' dplyr::tbl(con, "new_table")
#'
#' create_new_table()
#' insert_df |>
#' write_table("insert_table", mode = "create", con = con)
#' dplyr::tbl(con, "insert_table")
#'
#' dplyr::tbl(con, "insert_table") |>
#' write_table("new_table",
#' mode = "insert",
#' by = "id",
#' conflict = "ignore"
#' )
#' dplyr::tbl(con, "new_table")
#'
#' #' Update data in an existing table, only if key values match
#' create_new_table()
#' update_df = data.frame(id = c(1, 3), name = c("Alicia", "Charles"))
#' update_df
#' write_table(update_df,
#' "new_table",
#' mode = "update",
#' con = con,
#' unmatched = "ignore"
#' )
#' dplyr::tbl(con, "new_table")
#'
#' create_new_table()
#' write_table(update_df, "update_table", mode = "create", con = con)
#' dplyr::tbl(con, "update_table")
#' write_table(dplyr::tbl(con, "update_table"),
#' "new_table",
#' mode = "update",
#' unmatched = "ignore"
#' )
#' dplyr::tbl(con, "new_table")
#'
#' #' upsert
#' create_new_table()
#' upsert_df = data.frame(id = c(2, 6), name = c("Bobby", "Frank"))
#' upsert_df
#' write_table(upsert_df,
#' "new_table",
#' mode = "upsert",
#' con = con,
#' by = "id"
#' )
#' dplyr::tbl(con, "new_table")
#'
#' create_new_table()
#' write_table(upsert_df,
#' "upsert_table",
#' mode = "create",
#' con = con
#' )
#' dplyr::tbl(con, "upsert_table")
#' write_table(dplyr::tbl(con, "upsert_table"),
#' "new_table",
#' mode = "upsert"
#' )
#' dplyr::tbl(con, "new_table")
#'
#'
#' #' Patch data, updating only missing values
#' create_new_table()
#' patch_df = data.frame(id = c(1, 2), name = c("alice", NA))
#' patch_df
#' write_table(df_patch, "table_with_na", mode = "create", con)
#' dplyr::tbl(con, "table_with_na")
#' df
#' write_table(df,
#' "table_with_na",
#' mode = "patch",
#' con = con,
#' unmatched = "ignore"
#' )
#' dplyr::tbl(con, "table_with_na")
#'
#' DBI::dbRemoveTable(con, "table_with_na")
#' write_table(df_patch, "table_with_na", mode = "create", con)
#' dplyr::tbl(con, "new_table")
#' write_table(dplyr::tbl(con, "new_table"),
#' "table_with_na",
#' mode = "patch",
#' con = con,
#' unmatched = "ignore"
#' )
#' dplyr::tbl(con, "table_with_na")
#'
#' #' Delete rows for matching key values
#' create_new_table()
#' delete_df = data.frame(id = c(3, 4))
#' delete_df
#' write_table(df_delete,
#' "new_table",
#' mode = "delete",
#' con = con,
#' unmatched = "ignore"
#' )
#' dplyr::tbl(con, "new_table")
#'
#' create_new_table()
#' write_table(delete_df,
#' "delete_table",
#' mode = "create",
#' con = con
#' )
#' dplyr::tbl(con, "delete_table")
#' write_table(df_delete,
#' "new_table",
#' mode = "delete",
#' con = con,
#' unmatched = "ignore"
#' )
#' dplyr::tbl(con, "new_table")
#'
#' #' Overwrite data in an existing table, schema must match
#' create_new_table()
#' overwrite_df = data.frame(id = c(2, 6), name = c("Bobby", "Frank"))
#' overwrite_df
#' write_table(overwrite_df,
#' "new_table",
#' mode = "overwrite",
#' con = con
#' )
#' dplyr::tbl(con, "new_table")
#'
#' create_new_table()
#' overwrite_df = data.frame(id = c(2, 6), name = c("Bobby", "Frank"))
#' overwrite_df
#' write_table(overwrite_df,
#' "new_table",
#' mode = "overwrite",
#' con = con
#' )
#' dplyr::tbl(con, "new_table")
#'
#' #' Overwrite schema
#' overwrite_schema_df = data.frame(id = c(2, 6),
#' name = c("Bobby", "Frank"),
#' age = c(30, 40)
#' )
#' write_table(overwrite_schema_df,
#' "new_table",
#' mode = "overwrite_schema",
#' con = con
#' )
#' dplyr::tbl(con, "new_table")
#'
#' create_new_table()
#' write_table(overwrite_schema_df,
#' "overwrite_schema_table",
#' mode = "overwrite_schema",
#' con = con
#' )
#' write_table(dplyr::tbl(con, "overwrite_schema_table"),
#' "new_table",
#' mode = "overwrite_schema",
#' con = con
#' )
#' dplyr::tbl(con, "new_table")
#'
#' #' Disconnect from the database
#' DBI::dbDisconnect(con)
#' }
#' @export
write_table = function(x, table_name, mode, con = NULL, verbose = TRUE,...){
UseMethod("write_table", x)
}
# method to `write_table` dplyr tbl object to database
#' @rdname write_table
#' @export
write_table.tbl_lazy = function(x,
table_name,
mode = c("create",
"append",
"insert",
"update",
"upsert",
"patch",
"delete",
"overwrite",
"overwrite_schema"
),
con = NULL,
verbose = TRUE,
...
){
# assertions and setup *******************************************************
rlang::arg_match(mode)
table_name = validate_table_name(table_name)
if (is.null(con)){
con = x$src$con
}
if (!DBI::dbIsValid(con)){
cli::cli_abort("connection is not active", class = "error_connection")
}
table_exists_flag = DBI::dbExistsTable(con, string_to_id(table_name))
if (mode == "create"){
if (table_exists_flag){
cli::cli_abort("Cannot create table {table_name} as it already exists.",
class = "error_table"
)
}
}
if (mode %in% c("append","insert","update",
"upsert", "patch", "delete", "overwrite"
)
){
if (!table_exists_flag){
cli::cli_abort("Cannot '{mode}' as table {table_name} does not exist.",
class = "error_table"
)
}
}
# start progress bar *********************************************************
if (verbose){
cli::cli_progress_step("Operation '{mode}': {table_name}", )
}
# Operation ******************************************************************
ops = switch(mode,
create = write_tbl_create(x, table_name, con, ...),
overwrite = write_tbl_overwrite(x, table_name, con, ...),
overwrite_schema = write_tbl_overwrite_schema(x, table_name, con, ...),
append = write_rows(x, table_name, con, "append", ...),
insert = write_rows(x, table_name, con, "insert", ...),
update = write_rows(x, table_name, con, "update", ...),
upsert = write_rows(x, table_name, con, "upsert", ...),
patch = write_rows(x, table_name, con, "patch", ...),
delete = write_rows(x, table_name, con, "delete", ...),
)
# handle exceptions **********************************************************
if (inherits(ops, "try-error")){
print(ops)
cli::cli_abort("Failure occured in '{mode}' to table: {table_name}",
class = "error_operation"
)
}
# return *********************************************************************
return(invisible(table_name))
}
# method to `write_table` dplyr sql object to database
#' @rdname write_table
#' @export
write_table.sql = function(x,
table_name,
mode = c("create",
"append",
"insert",
"update",
"upsert",
"patch",
"delete",
"overwrite",
"overwrite_schema"
),
con = NULL,
verbose = TRUE,
...
){
# assertions and setup *******************************************************
if (!inherits(con, "DBIConnection")){
cli::cli_inform("'con' is typically generated from `DBI::dbConnect`")
cli::cli_abort("'con' should be a `DBIConnection` object",
class = "error_input"
)
}
if (!DBI::dbIsValid(con)){
cli::cli_abort("connection is not active", class = "error_connection")
}
# validate sql input and call via tbl ****************************************
tbl_obj = try(dplyr::tbl(con, x), silent = TRUE)
if (inherits(tbl_obj, "try-error")){
cli::cli_warn("Possibly a invalid SQL statement")
cli::cli_abort("Unable to create a tbl object",
class = "error_tbl_create",
parent = tbl_obj
)
} else {
write_table.tbl_lazy(tbl_obj,
table_name = table_name,
mode = mode,
con = NULL,
...
)
}
}
# method to `write_table` data.frame to database
#' @rdname write_table
#' @export
write_table.data.frame = function(x,
table_name,
mode = c("create",
"append",
"insert",
"update",
"upsert",
"patch",
"delete",
"overwrite",
"overwrite_schema"
),
con = NULL,
verbose = TRUE,
...
){
# assertions and setup *******************************************************
rlang::arg_match(mode)
table_name = validate_table_name(table_name)
if (!inherits(con, "DBIConnection")){
cli::cli_inform("'con' is typically generated from `DBI::dbConnect`")
cli::cli_abort("'con' should be a `DBIConnection` object",
class = "error_input"
)
}
if (!DBI::dbIsValid(con)){
cli::cli_abort("connection is not active", class = "error_connection")
}
table_exists_flag = DBI::dbExistsTable(con, string_to_id(table_name))
if (mode == "create"){
if (table_exists_flag){
cli::cli_abort("Cannot create table {table_name} as it already exists.",
class = "error_table"
)
}
}
if (mode %in% c("append","insert","update",
"upsert", "patch", "delete", "overwrite"
)
){
if (!table_exists_flag){
cli::cli_abort("Cannot '{mode}' as table {table_name} does not exist.",
class = "error_table"
)
}
}
# start progress bar *********************************************************
if (verbose){
cli::cli_progress_step("Operation '{mode}': {table_name}", )
}
# Operation ******************************************************************
ops = switch(mode,
create = write_df_create(x, table_name, con, ...),
overwrite = write_df_overwrite(x, table_name, con, ...),
overwrite_schema = write_df_overwrite_schema(x, table_name, con, ...),
append = write_df_append(x, table_name, con, ...),
insert = write_rows(x, table_name, con, "insert", ...),
update = write_rows(x, table_name, con, "update", ...),
upsert = write_rows(x, table_name, con, "upsert", ...),
patch = write_rows(x, table_name, con, "patch", ...),
delete = write_rows(x, table_name, con, "delete", ...),
)
# handle exceptions **********************************************************
if (inherits(ops, "try-error")){
print(ops)
cli::cli_abort("Failure occured in '{mode}' to table: {table_name}",
class = "error_operation"
)
}
# return *********************************************************************
return(invisible(table_name))
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.