Nothing
#' Create Bulk API Job
#'
#' This function initializes a Job in the Salesforce Bulk API
#'
#' @importFrom lifecycle deprecated is_present deprecate_warn
#' @template operation
#' @template object_name
#' @template soql
#' @template external_id_fieldname
#' @template api_type
#' @param content_type \code{character}; being one of 'CSV', 'ZIP_CSV', 'ZIP_XML', or 'ZIP_JSON' to
#' indicate the type of data being passed to the Bulk APIs. For the Bulk 2.0 API the only
#' valid value (and the default) is 'CSV'.
#' @param concurrency_mode \code{character}; either "Parallel" or "Serial" that specifies
#' whether batches should be completed sequentially or in parallel. Use "Serial"
#' only if lock contentions persist with in "Parallel" mode. Note: this argument is
#' only used in the Bulk 1.0 API and will be ignored in calls using the Bulk 2.0 API.
#' @param column_delimiter \code{character}; indicating the column delimiter used for CSV job data.
#' The default value is COMMA. Valid values are: "BACKQUOTE", "CARET", "COMMA", "PIPE",
#' "SEMICOLON", and "TAB", but this package only accepts and uses "COMMA". Also,
#' note that this argument is only used in the Bulk 2.0 API and will be ignored
#' in calls using the Bulk 1.0 API.
#' @template control
#' @param ... arguments passed to \code{\link{sf_control}}
#' @template line_ending
#' @template verbose
#' @return A \code{tbl_df} parameters defining the created job, including id
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @examples
#' \dontrun{
#' # insert into Account
#' job_info <- sf_create_job_bulk(operation='insert', object_name='Account')
#'
#' # delete from Account
#' job_info <- sf_create_job_bulk(operation='delete', object_name='Account')
#'
#' # update into Account
#' job_info <- sf_create_job_bulk(operation='update', object_name='Account')
#'
#' # upsert into Account
#' job_info <- sf_create_job_bulk(operation='upsert',
#' externalIdFieldName='My_External_Id__c',
#' object_name='Account')
#' # insert attachments
#' job_info <- sf_create_job_bulk(operation='insert', object_name='Attachment')
#'
#' # query leads
#' job_info <- sf_create_job_bulk(operation='query', object_name='Lead')
#' }
#' @export
sf_create_job_bulk <- function(operation = c("insert", "delete", "upsert", "update",
"hardDelete", "query", "queryall"),
object_name,
soql=NULL,
external_id_fieldname = NULL,
api_type = c("Bulk 1.0", "Bulk 2.0"),
content_type = c('CSV', 'ZIP_CSV', 'ZIP_XML', 'ZIP_JSON'),
concurrency_mode = c("Parallel", "Serial"),
column_delimiter = c('COMMA', 'TAB', 'PIPE', 'SEMICOLON',
'CARET', 'BACKQUOTE'),
control = list(...), ...,
line_ending = deprecated(),
verbose = FALSE){
api_type <- match.arg(api_type)
operation <- match.arg(operation)
content_type <- match.arg(content_type)
# determine how to pass along the control args
control_args <- return_matching_controls(control)
control_args$api_type <- api_type
control_args$operation <- operation
if(is_present(line_ending)) {
deprecate_warn("0.1.3", "salesforcer::sf_create_job_bulk(line_ending = )",
"sf_create_job_bulk(LineEndingHeader = )",
details = paste0("You can pass the line ending directly ",
"as shown above or via the `control` argument."))
control_args$LineEndingHeader <- list(`Sforce-Line-Ending` = line_ending)
}
if(api_type == "Bulk 1.0"){
if(!missing(column_delimiter)){
warning(paste0("Ignoring the column_delimiter argument which isn't used when ",
"calling the Bulk 1.0 API", call. = FALSE))
}
job_response <- sf_create_job_bulk_v1(operation = operation,
object_name = object_name,
external_id_fieldname = external_id_fieldname,
content_type = content_type,
concurrency_mode = concurrency_mode,
control = control_args, ...,
verbose = verbose)
} else if(api_type == "Bulk 2.0"){
bulk_v2_supported_operations <- c("insert", "delete", "upsert",
"update", "query", "queryall")
if(!(operation %in% bulk_v2_supported_operations)){
stop_w_errors_listed("Bulk 2.0 only supports the following operations:",
bulk_v2_supported_operations)
}
bulk_v2_supported_content_type <- c("CSV")
if(!(content_type %in% bulk_v2_supported_content_type)){
stop_w_errors_listed("Bulk 2.0 only supports the following content types:",
bulk_v2_supported_operations)
}
if(!missing(concurrency_mode)){
warning(paste0("Ignoring the concurrency_mode argument which is not ",
"used when calling the Bulk 2.0 API"), call. = FALSE)
}
job_response <- sf_create_job_bulk_v2(operation = operation,
object_name = object_name,
soql = soql,
external_id_fieldname = external_id_fieldname,
content_type = content_type,
column_delimiter = column_delimiter,
control = control_args, ...,
verbose = verbose)
} else {
catch_unknown_api(api_type)
}
return(job_response)
}
#' Create Job using Bulk 1.0 API
#'
#' @importFrom xml2 xml_new_document xml_add_child xml_add_sibling
#' @importFrom httr content
#' @importFrom XML xmlToList
#' @template operation
#' @template object_name
#' @template external_id_fieldname
#' @param content_type \code{character}; a string indicating the format for
#' the API request and response. Must be one of 'CSV', 'ZIP_CSV', 'ZIP_XML', or
#' 'ZIP_JSON'.
#' @param concurrency_mode \code{character}; a string indicating whether the batches
#' should be processed in parallel or serially (sequentially). Serial processing
#' is helpful when multiple records may trigger simultaneous edits to another
#' related record (e.g., updating multiple children all on the same account).
#' @template control
#' @param ... arguments to be used to form the default control argument if it is not supplied directly.
#' @template verbose
#' @return \code{tbl_df}; a data frame containing information about the job created.
#' @note This function is meant to be used internally. Only use when debugging.
#' @keywords internal
#' @export
sf_create_job_bulk_v1 <- function(operation = c("insert", "delete", "upsert", "update",
"hardDelete", "query", "queryall"),
object_name,
external_id_fieldname = NULL,
content_type = c('CSV', 'ZIP_CSV', 'ZIP_XML', 'ZIP_JSON'),
concurrency_mode = c("Parallel", "Serial"),
control, ...,
verbose = FALSE){
operation <- match.arg(operation)
content_type <- match.arg(content_type)
concurrency_mode <- match.arg(concurrency_mode)
control <- do.call("sf_control", control)
request_headers <- c("Accept"="application/xml",
"Content-Type"="application/xml")
if("BatchRetryHeader" %in% names(control)){
request_headers <- c(request_headers, c("Sforce-Disable-Batch-Retry" = control$BatchRetryHeader[[1]]))
}
if("LineEndingHeader" %in% names(control)){
bulk_supported_line_ending_headers <- c("CRLF", "LF")
if(!(control$LineEndingHeader[[1]] %in% bulk_supported_line_ending_headers)){
stop_w_errors_listed("Bulk APIs only supports the following line ending headers:",
bulk_supported_line_ending_headers)
}
request_headers <- c(request_headers, c("Sforce-Line-Ending" = control$LineEndingHeader[[1]]))
}
if("PKChunkingHeader" %in% names(control)){
if(is.logical(control$PKChunkingHeader[[1]]) |
tolower(control$PKChunkingHeader[[1]]) %in% c("true", "false")){
request_headers <- c(request_headers,
c("Sforce-Enable-PKChunking" = toupper(control$PKChunkingHeader[[1]])))
} else {
l <- control$PKChunkingHeader
value <- paste0(paste(names(l), unlist(l), sep="="), collapse = "; ")
request_headers <- c(request_headers, c("Sforce-Enable-PKChunking" = value))
}
}
# build xml for Bulk 1.0 request
body <- xml_new_document()
body %>%
xml_add_child("jobInfo",
"xmlns" = "http://www.force.com/2009/06/asyncapi/dataload") %>%
xml_add_child("operation", operation) %>%
xml_add_sibling("object", object_name)
if(operation == 'upsert'){
if(is.null(external_id_fieldname)){
stop("All 'upsert' operations require an external id field. Please specify.", call.=FALSE)
} else {
body %>% xml_add_child("externalIdFieldName", external_id_fieldname)
}
}
body %>%
xml_add_child("concurrencyMode", concurrency_mode) %>%
xml_add_child("contentType", content_type)
request_body <- as.character(body)
bulk_create_job_url <- make_bulk_create_job_url(api_type="Bulk 1.0")
httr_response <- rPOST(url = bulk_create_job_url,
headers = request_headers,
body = request_body)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers,
request_body)
}
catch_errors(httr_response)
response_parsed <- content(httr_response, encoding="UTF-8")
job_info <- response_parsed %>%
xml_ns_strip() %>%
xml_find_all('//jobInfo') %>%
map_df(xml_nodeset_to_df) %>%
type_convert(col_types = cols())
return(job_info)
}
#' Create Job using Bulk 2.0 API
#'
#' @importFrom xml2 xml_new_document xml_add_child xml_add_sibling
#' @importFrom httr content
#' @importFrom jsonlite toJSON prettify
#' @template operation
#' @template object_name
#' @template soql
#' @template external_id_fieldname
#' @param content_type \code{character}; a string indicating the format for
#' the API request and response. Must be 'CSV' because it is the only supported
#' format for the Bulk 2.0 API.
#' @param column_delimiter \code{character}; a string indicating which character
#' should be treated as the delimiter in the CSV file. Must be one of 'COMMA',
#' 'TAB', 'PIPE', 'SEMICOLON', 'CARET', or 'BACKQUOTE'.
#' @template control
#' @param ... arguments to be used to form the default control argument if it is not supplied directly.
#' @template verbose
#' @return \code{tbl_df}; a data frame containing information about the job created.
#' @note This function is meant to be used internally. Only use when debugging.
#' @keywords internal
#' @export
sf_create_job_bulk_v2 <- function(operation = c("insert", "delete",
"upsert", "update",
"query", "queryall"),
object_name,
soql = NULL,
external_id_fieldname = NULL,
content_type = 'CSV',
column_delimiter = c('COMMA', 'TAB', 'PIPE', 'SEMICOLON',
'CARET', 'BACKQUOTE'),
control, ...,
verbose=FALSE){
operation <- match.arg(operation)
column_delimiter <- match.arg(column_delimiter)
if(content_type != "CSV"){
stop("content_type = 'CSV' is currently the only supported format of returned bulk content.")
}
bulk_v2_supported_column_delimiter <- c("COMMA")
if(!(column_delimiter %in% bulk_v2_supported_column_delimiter)){
stop_w_errors_listed("Bulk 2.0 API only supports the following file delimiter:",
bulk_v2_supported_column_delimiter)
}
if(operation == 'upsert' & is.null(external_id_fieldname)){
stop("All 'upsert' operations require an external id field. Please specify.", call.=FALSE)
}
query_operation <- FALSE
if(tolower(operation) %in% c("query", "queryall")){
stopifnot(!is.null(soql))
if(tolower(operation) == "queryall"){
operation <- "queryAll"
} else {
operation <- "query"
}
query_operation <- TRUE
# set this to NULL because for V2 queries, the 'object' property is not settable
object_name <- NULL
}
control <- do.call("sf_control", control)
if("LineEndingHeader" %in% names(control)){
stopifnot(control$LineEndingHeader[[1]] %in% c("CRLF", "LF"))
line_ending <- control$LineEndingHeader$`Sforce-Line-Ending`
} else {
if(get_os() == 'windows'){
# line_ending <- "CRLF"
# readr::write_csv always uses LF as of 1.3.1, but in the next version
# will allow users to to specify CRLF, which we will do automatically if
# get_os() == 'windows'
# https://github.com/tidyverse/readr/issues/857
line_ending <- "LF"
} else {
line_ending <- "LF"
}
}
# form body from arguments
request_body <- list(operation = operation,
query = soql,
object = object_name,
contentType = content_type,
externalIdFieldName = external_id_fieldname,
lineEnding = line_ending,
columnDelimiter = column_delimiter)
request_body[sapply(request_body, is.null)] <- NULL
request_body <- toJSON(request_body, auto_unbox = TRUE)
bulk_create_job_url <- make_bulk_create_job_url(api_type = "Bulk 2.0",
query_operation = query_operation)
httr_response <- rPOST(url = bulk_create_job_url,
headers = c("Accept"="application/json",
"Content-Type"="application/json"),
body = request_body)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers,
prettify(request_body))
}
catch_errors(httr_response)
response_parsed <- content(httr_response, encoding="UTF-8")
job_info <- as_tibble(response_parsed)
return(job_info)
}
#' Get Bulk API Job
#'
#' This function retrieves details about a Job in the Salesforce Bulk API
#'
#' @template job_id
#' @template api_type
#' @param query_operation \code{logical}; an indicator of whether the job is a query job,
#' which is needed when using the Bulk 2.0 API because the URI endpoints are different
#' for the "ingest" vs. the "query" jobs.
#' @template verbose
#' @return A \code{tbl_df} of parameters defining the details of the specified job id
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk('insert', 'Account')
#' refreshed_job_info <- sf_get_job_bulk(job_info$id)
#' sf_abort_job_bulk(refreshed_job_info$id)
#' }
#' @export
sf_get_job_bulk <- function(job_id,
api_type = c("Bulk 1.0", "Bulk 2.0"),
query_operation = FALSE,
verbose = FALSE){
api_type <- match.arg(api_type)
bulk_get_job_url <- make_bulk_get_job_url(job_id,
api_type=api_type,
query_operation=query_operation)
httr_response <- rGET(url = bulk_get_job_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
if(api_type == "Bulk 1.0"){
content_type <- httr_response$headers$`content-type`
if (grepl('xml', content_type)){
response_parsed <- content(httr_response, as='parsed', type="text/xml", encoding="UTF-8")
job_info <- response_parsed %>%
xml_ns_strip() %>%
xml_find_all('//jobInfo') %>%
map_df(xml_nodeset_to_df) %>%
type_convert(col_types = cols())
} else if(grepl('json', content_type)){
response_parsed <- content(httr_response, as='parsed', type="application/json", encoding="UTF-8")
response_parsed[sapply(response_parsed, is.null)] <- NA
job_info <- as_tibble(response_parsed)
} else {
message(sprintf("Unhandled content-type: %s", content_type))
job_info <- content(httr_response, as='parsed', encoding="UTF-8")
}
} else if(api_type == "Bulk 2.0"){
response_parsed <- content(httr_response, encoding="UTF-8")
job_info <- as_tibble(response_parsed)
} else {
catch_unknown_api(api_type, c("Bulk 1.0", "Bulk 2.0"))
}
return(job_info)
}
#' Get All Bulk API Jobs
#'
#' This function retrieves details about all Bulk jobs in the org.
#'
#' @importFrom httr content
#' @importFrom readr type_convert cols col_guess
#' @importFrom purrr map_df
#' @importFrom dplyr as_tibble bind_rows mutate_all
#' @param parameterized_search_list list; a list of parameters to be added as part
#' of the URL query string (i.e. after a question mark ("?") so that the result
#' only returns information about jobs that meet that specific criteria. For
#' more information, read the note below and/or the Salesforce documentation
#' \href{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/get_all_jobs.htm}{here}.
#' @param next_records_url character (leave as NULL); a string used internally
#' by the function to paginate through to more records until complete
#' @template api_type
#' @template verbose
#' @return A \code{tbl_df} of parameters defining the details of all bulk jobs
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/get_all_jobs.htm}
#' @note parameterized_search_list elements that can be set to filter the results:
#' \itemize{
#' \item{isPkChunkingEnabled}{A logical either TRUE or FALSE. TRUE only returns
#' information about jobs where PK Chunking has been enabled.}
#' \item{jobType}{A character string to return jobs matching the specified type.
#' Must be one of: "BigObjectIngest", "Classic", "V2QIngest". Classic corresponds
#' to Bulk 1.0 API jobs and V2Ingest corresponds to the Bulk 2.0 API jobs.}
#' }
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk('insert', 'Account')
#' all_jobs_info <- sf_get_all_jobs_bulk()
#' # just the Bulk API 1.0 jobs
#' all_jobs_info <- sf_get_all_jobs_bulk(parameterized_search_list=list(jobType='Classic'))
#' }
#' @export
sf_get_all_jobs_bulk <- function(parameterized_search_list =
list(isPkChunkingEnabled=NULL,
jobType=NULL),
next_records_url = NULL,
api_type = c("Bulk 2.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
this_url <- make_bulk_get_all_jobs_url(parameterized_search_list,
next_records_url,
api_type)
httr_response <- rGET(url = this_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
response_parsed <- content(httr_response, as="parsed", encoding="UTF-8")
if(length(response_parsed$records) > 0){
resultset <- response_parsed$records %>%
map_df(as_tibble) %>%
mutate_all(as.character)
} else {
resultset <- tibble()
}
# check whether it has next record
if(!response_parsed$done){
next_records <- sf_get_all_jobs_bulk(parameterized_search_list = parameterized_search_list,
next_records_url = response_parsed$nextRecordsUrl,
api_type = api_type,
verbose = verbose)
resultset <- safe_bind_rows(list(resultset, next_records))
}
if(is.null(next_records_url) & (nrow(resultset) > 0)){
resultset <- resultset %>%
type_convert(col_types = cols(.default = col_guess()))
}
return(resultset)
}
#' Get All Bulk API Query Jobs
#'
#' This function retrieves details about all Bulk query jobs in the org.
#'
#' @importFrom httr content
#' @importFrom readr type_convert cols col_guess
#' @importFrom purrr map_df
#' @importFrom dplyr as_tibble bind_rows filter mutate_all if_all any_of
#' @param parameterized_search_list list; a list of parameters to be added as part
#' of the URL query string (i.e. after a question mark ("?") so that the result
#' only returns information about jobs that meet that specific criteria. For
#' more information, read the note below and/or the Salesforce documentation
#' \href{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/query_get_all_jobs.htm}{here}.
#' @param next_records_url character (leave as NULL); a string used internally
#' by the function to paginate through to more records until complete
#' @template api_type
#' @template verbose
#' @return A \code{tbl_df} of parameters defining the details of all bulk jobs
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/get_all_jobs.htm}
#' @note parameterized_search_list elements that can be set to filter the results:
#' \itemize{
#' \item{isPkChunkingEnabled}{A logical either TRUE or FALSE. TRUE only returns
#' information about jobs where PK Chunking has been enabled.}
#' \item{jobType}{A character string to return jobs matching the specified type.
#' Must be one of: "BigObjectIngest", "Classic", "V2Query". Classic corresponds
#' to Bulk 1.0 API jobs and V2Query corresponds to the Bulk 2.0 API jobs.}
#' \item{concurrencyMode}{A character string to return jobs matching the specified
#' concurrency mode. Must be one of: "serial" or "parallel", but only "serial"
#' is currently supported.}
#' }
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk('insert', 'Account')
#' all_query_jobs_info <- sf_get_all_query_jobs_bulk()
#' # just the Bulk API 2.0 query jobs
#' all_query_jobs_info <- sf_get_all_query_jobs_bulk(parameterized_search_list=list(jobType='V2Query'))
#' # just the Bulk API 1.0 query jobs
#' all_query_jobs_info <- sf_get_all_query_jobs_bulk(parameterized_search_list=list(jobType='Classic'))
#' }
#' @export
sf_get_all_query_jobs_bulk <- function(parameterized_search_list =
list(isPkChunkingEnabled=NULL,
jobType=NULL,
concurrencyMode=NULL),
next_records_url = NULL,
api_type = c("Bulk 2.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
this_url <- make_bulk_get_all_query_jobs_url(parameterized_search_list,
next_records_url,
api_type)
httr_response <- rGET(url = this_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
response_parsed <- content(httr_response, as="parsed", encoding="UTF-8")
if(length(response_parsed$records) > 0){
resultset <- response_parsed$records %>%
map_df(as_tibble) %>%
mutate_all(as.character)
} else {
resultset <- tibble()
}
# check whether the query has more results to pull via pagination
if(!response_parsed$done){
next_records <- sf_get_all_query_jobs_bulk(parameterized_search_list = parameterized_search_list,
next_records_url = response_parsed$nextRecordsUrl,
api_type = api_type,
verbose = verbose)
resultset <- safe_bind_rows(list(resultset, next_records))
}
# cast the data in the final iteration and remove Bulk 1.0 ingest operations
# because as of 6/10/2020 the API will return both Bulk 1.0 query jobs and ingest
# jobs, for some reason, but to make it more straightforward for the user we
# will filter to only include the query jobs
if(is.null(next_records_url) & (nrow(resultset) > 0)){
# cast the data
resultset <- resultset %>%
type_convert(col_types = cols(.default = col_guess())) %>%
# ignore record ids that could not be matched
filter(if_all(any_of("operation"), ~(.x %in% c('query', 'queryall')))) # is queryall an option?
}
return(resultset)
}
#' End Bulk API Job
#'
#' @importFrom jsonlite toJSON prettify
#' @template job_id
#' @param end_type \code{character}; taking a value of "Closed" or "Aborted" indicating
#' how the bulk job should be ended
#' @template api_type
#' @template verbose
#' @return \code{logical}; returns \code{TRUE} if the job was able to be ended;
#' otherwise, an error message is printed
#' @note This function is meant to be used internally. Only use when debugging.
#' @keywords internal
#' @export
sf_end_job_bulk <- function(job_id,
end_type = c("Closed", "UploadComplete", "Aborted"),
api_type = c("Bulk 1.0", "Bulk 2.0"),
verbose = FALSE){
end_type <- match.arg(end_type)
api_type <- match.arg(api_type)
if(api_type == "Bulk 2.0" & end_type == "Closed"){
end_type <- "UploadComplete"
}
request_body <- toJSON(list(state=end_type), auto_unbox = TRUE)
bulk_end_job_url <- make_bulk_end_job_generic_url(job_id, api_type)
if(api_type == "Bulk 2.0"){
httr_response <- rPATCH(url = bulk_end_job_url,
headers = c("Accept"="application/json",
"Content-Type"="application/json"),
body = request_body)
} else {
httr_response <- rPOST(url = bulk_end_job_url,
headers = c("Accept"="application/json",
"Content-Type"="application/json"),
body = request_body)
}
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers,
prettify(request_body))
}
catch_errors(httr_response)
return(TRUE)
}
#' Close Bulk API Job
#'
#' This function closes a Job in the Salesforce Bulk API
#'
#' @template job_id
#' @template api_type
#' @template verbose
#' @return A \code{list} of parameters defining the now closed job
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @note This is a legacy function used only with Bulk 1.0.
#' @examples
#' \dontrun{
#' my_query <- "SELECT Id, Name FROM Account LIMIT 10"
#' job_info <- sf_create_job_bulk(operation='query', object='Account')
#' query_info <- sf_submit_query_bulk(job_id=job_info$id, soql=my_query)
#' recordset <- sf_query_result_bulk(job_id = query_info$jobId,
#' batch_id = query_info$id,
#' result_id = result$result)
#' sf_close_job_bulk(job_info$id)
#' }
#' @export
sf_close_job_bulk <- function(job_id,
api_type = c("Bulk 1.0", "Bulk 2.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
sf_end_job_bulk(job_id, end_type = "Closed", api_type = api_type, verbose = verbose)
}
#' Signal Upload Complete to Bulk API Job
#'
#' This function signals that uploads are complete to a Job in the Salesforce Bulk API
#'
#' @template job_id
#' @template api_type
#' @template verbose
#' @return A \code{list} of parameters defining the job after signaling a completed upload
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @note This function is typically not used directly. It is used in \code{sf_create_batches_bulk()}
#' right after submitting the batches to signal to Salesforce that the batches should
#' no longer be queued.
#' @examples
#' \dontrun{
#' upload_info <- sf_upload_complete_bulk(job_id=job_info$id)
#' }
#' @export
sf_upload_complete_bulk <- function(job_id,
api_type = c("Bulk 2.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
sf_end_job_bulk(job_id, end_type = "UploadComplete", api_type = api_type, verbose = verbose)
}
#' Abort Bulk API Job
#'
#' This function aborts a Job in the Salesforce Bulk API
#'
#' @template job_id
#' @template api_type
#' @template verbose
#' @return A \code{list} of parameters defining the now aborted job
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk('insert', 'Account')
#' sf_abort_job_bulk(job_info$id)
#' }
#' @export
sf_abort_job_bulk <- function(job_id,
api_type = c("Bulk 1.0", "Bulk 2.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
sf_end_job_bulk(job_id, end_type = "Aborted", api_type = api_type, verbose = verbose)
}
#' Delete Bulk API Job
#'
#' @template job_id
#' @template api_type
#' @template verbose
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk('insert', 'Account')
#' sf_abort_job_bulk(job_info$id)
#' sf_delete_job_bulk(job_info$id)
#' }
#' @export
sf_delete_job_bulk <- function(job_id,
api_type = c("Bulk 2.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
bulk_delete_job_url <- make_bulk_delete_job_url(job_id, api_type = api_type)
httr_response <- rDELETE(url = bulk_delete_job_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
return(TRUE)
}
#' Add Batches to a Bulk API Job
#'
#' This function takes a data frame and submits it in batches to a
#' an already existing Bulk API Job by chunking into temp files
#'
#' @importFrom httr upload_file
#' @template job_id
#' @param input_data \code{named vector}, \code{matrix}, \code{data.frame}, or
#' \code{tbl_df}; data can be coerced into CSV file for submitting as batch request
#' @template batch_size
#' @template api_type
#' @template verbose
#' @return a \code{tbl_df} containing details of each batch
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @examples
#' \dontrun{
#' # NOTE THAT YOU MUST FIRST CREATE AN EXTERNAL ID FIELD CALLED My_External_Id
#' # BEFORE RUNNING THIS EXAMPLE
#' # inserting 2 records
#' my_data <- tibble(Name=c('New Record 1', 'New Record 2'),
#' My_External_Id__c=c('11111','22222'))
#' job_info <- sf_create_job_bulk(operation='insert',
#' object='Account')
#' batches_ind <- sf_create_batches_bulk(job_id = job_info$id,
#' input_data = my_data)
#' # upserting 3 records
#' my_data2 <- tibble(My_External_Id__c=c('11111','22222', '99999'),
#' Name=c('Updated_Name1', 'Updated_Name2', 'Upserted_Record'))
#' job_info <- sf_create_job_bulk(operation='upsert',
#' externalIdFieldName='My_External_Id__c',
#' object='Account')
#' batches_ind <- sf_create_batches_bulk(job_id = job_info$id,
#' input_data = my_data2)
#' sf_get_job_bulk(job_info$id)
#' }
#' @export
sf_create_batches_bulk <- function(job_id,
input_data,
batch_size = NULL,
api_type = c("Bulk 1.0", "Bulk 2.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
if(api_type == "Bulk 1.0"){
created_batches <- sf_create_batches_bulk_v1(job_id, input_data,
batch_size = batch_size,
verbose = verbose)
} else if(api_type == "Bulk 2.0"){
created_batches <- sf_create_batches_bulk_v2(job_id, input_data,
batch_size = batch_size,
verbose = verbose)
} else {
catch_unknown_api(api_type, c("Bulk 1.0", "Bulk 2.0"))
}
return(created_batches)
}
#' @importFrom utils head
#' @importFrom stats quantile
#' @importFrom purrr transpose
#' @importFrom XML saveXML xmlDoc
#' @importFrom zip zipr
sf_create_batches_bulk_v1 <- function(job_id,
input_data,
batch_size = NULL,
verbose = FALSE){
job_status <- sf_get_job_bulk(job_id,
api_type = "Bulk 1.0",
verbose = verbose)
stopifnot(job_status$state == "Open")
if(job_status$contentType %in% c("ZIP_CSV", "ZIP_XML", "ZIP_JSON")){
binary_attachments <- TRUE
input_data <- sf_input_data_validation(operation = sprintf("%s_%s",
job_status$operation,
tolower(job_status$object)),
input_data)
input_data <- check_and_encode_files(input_data, encode=FALSE)
} else {
binary_attachments <- FALSE
input_data <- sf_input_data_validation(operation = job_status$operation, input_data)
}
if(binary_attachments){
# Binary content Bulk API Limits:
# - The length of any file name can’t exceed 512 bytes.
# - A zip file can’t exceed 10 MB.
# - The total size of the unzipped content can’t exceed 20 MB.
# - A maximum of 1,000 files can be contained in a zip file. Directories don’t
# count toward this total.
# https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_concepts_limits.htm#binary_content_title
# TODO: Determine if we want to check the following for the user or allow Salesforce
# to return errors back to the user for them to correct
# - Check that each Name and Body is unique?
# - Check file name < 512 bytes?
# - Check if the number of files in a batch is greater than 1000, if so, then repartition
if(is.null(batch_size)){
# use conservative approach and batch in 10MB unzipped, which will be < 10MB zipped
file_sizes_mb <- sapply(input_data$Body, file.size, USE.NAMES = FALSE) / 1000000
if(any(file_sizes_mb > 10)){
stop_w_errors_listed(paste0("The max file size limit is 10MB. The following ",
"files exceed that limit:"),
input_data$Name[file_sizes_mb > 10])
}
batch_id <- floor(cumsum(file_sizes_mb) / 10)
} else {
if (batch_size > 10){
message(sprintf("It appears that you have set `batch_size=%s` while uploading binary blob data.", batch_size))
message("It is highly recommended to leave `batch_size=NULL` so that the batch sizes can be optimized for you.")
}
batch_id <- (seq.int(1:length(input_data$Body))-1) %/% batch_size
}
if(verbose) message("Submitting data in ", max(batch_id) + 1, " Batches")
message_flag <- unique(as.integer(quantile(0:max(batch_id), c(0.25,0.5,0.75,1))))
bulk_batches_url <- make_bulk_batches_url(job_id, api_type="Bulk 1.0")
batches_response <- list()
tmpdir <- tempdir(TRUE)
for(batch in seq(0, max(batch_id))){
if(verbose){
batch_msg_flg <- batch %in% message_flag
if(batch_msg_flg){
message(paste0("Processing Batch # ", head(batch, 1) + 1))
}
}
# zip the lists of files from each batch
batched_data <- input_data[batch_id == batch, , drop=FALSE]
# create manifest of files in batched zip file
# convert Body file references to relative within the zip file (i.e. add leading #)
orig_paths <- batched_data$Body
batched_data$Body <- paste0("#", basename(batched_data$Body))
f1a <- tempfile(fileext = "_request.txt")
if(job_status$contentType == "ZIP_CSV"){
# make a unique file name and save the request txt file
write_csv(batched_data, f1a)
} else if(job_status$contentType == "ZIP_XML"){
xml_manifest <- build_manifest_xml_from_list(transpose(batched_data, .names=rep("sObject", nrow(batched_data))))
invisible(saveXML(xmlDoc(xml_manifest), file=f1a, indent=FALSE,
prefix='<?xml version="1.0" encoding="UTF-8"?>\n'))
} else if(job_status$contentType == "ZIP_JSON"){
# stop if all Name column values are not equal to Body without "#"
if(!all(batched_data$Name == gsub("^#", "", batched_data$Body))){
stop("When using content_type='ZIP_JSON' the values in Name must match the base file name exactly. Fix or use content_type='ZIP_CSV'.")
}
write(toJSON(batched_data), f1a)
} else {
stop("Unsupported content type for binary attachments.")
}
# copy the file and rename the file because it is required to be called
# request.txt by Salesforce
f1b <- file.path(tmpdir, "request.txt")
invisible(file.copy(from = f1a, to = f1b, overwrite = TRUE))
# create the batched zip file with the manifest inside
f2 <- tempfile(fileext = ".zip")
zipr(f2, c(f1b, orig_paths))
# set content type to match the format of the zip file and its manifest
zip_content_type <- gsub("_", "/", tolower(job_status$contentType))
httr_response <- rPOST(url = bulk_batches_url,
headers = c("Content-Type"=zip_content_type,
"Accept"="application/xml"),
body = upload_file(path=f2, type=zip_content_type))
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers,
sprintf("Uploaded ZIP file: %s\nWith manifest: %s", f2, f1a))
}
catch_errors(httr_response)
content_type <- httr_response$headers$`content-type`
if (grepl('xml', content_type)){
response_parsed <- content(httr_response, as='parsed', type="text/xml", encoding="UTF-8")
this_batch_info <- response_parsed %>%
xml_ns_strip() %>%
xml_find_all('//batchInfo') %>%
map_df(xml_nodeset_to_df) %>%
type_convert(col_types = cols())
} else if(grepl('json', content_type)){
response_parsed <- content(httr_response, as='parsed', type="application/json", encoding="UTF-8")
response_parsed[sapply(response_parsed, is.null)] <- NA
this_batch_info <- response_parsed
} else {
message(sprintf("Unhandled content-type: %s", content_type))
this_batch_info <- content(httr_response, as='parsed', encoding="UTF-8")
}
batches_response[[batch+1]] <- this_batch_info
}
} else {
# Batch sizes should be adjusted based on processing times. Start with 5000
# records and adjust the batch size based on processing time. If it takes more
# than five minutes to process a batch, it may be beneficial to reduce the batch size.
# If it takes a few seconds, the batch size should be increased. If you get a
# timeout error when processing a batch, split your batch into smaller batches,
# and try again.
# https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_planning_guidelines.htm
if(is.null(batch_size)){
batch_size <- 5000
}
row_num <- nrow(input_data)
batch_id <- (seq.int(row_num)-1) %/% batch_size
if(verbose) message("Submitting data in ", max(batch_id) + 1, " Batches")
message_flag <- unique(as.integer(quantile(0:max(batch_id), c(0.25,0.5,0.75,1))))
bulk_batches_url <- make_bulk_batches_url(job_id, api_type="Bulk 1.0")
batches_response <- list()
for(batch in seq(0, max(batch_id))){
if(verbose){
batch_msg_flg <- batch %in% message_flag
if(batch_msg_flg){
message(paste0("Processing Batch # ", head(batch, 1) + 1))
}
}
batched_data <- input_data[batch_id == batch, , drop=FALSE]
f <- tempfile()
sf_write_csv(batched_data, f)
httr_response <- rPOST(url = bulk_batches_url,
headers = c("Content-Type"="text/csv",
"Accept"="application/xml"),
body = upload_file(path=f, type="text/csv"))
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers,
sprintf("Uploaded CSV file: %s", f))
}
catch_errors(httr_response)
response_parsed <- content(httr_response, encoding="UTF-8")
this_batch_info <- response_parsed %>%
xml_ns_strip() %>%
xml_find_all('//batchInfo') %>%
map_df(xml_nodeset_to_df) %>%
type_convert(col_types = cols())
batches_response[[batch+1]] <- this_batch_info
}
}
batches_response <- safe_bind_rows(batches_response)
return(batches_response)
}
#' @importFrom utils object.size head
#' @importFrom stats quantile
sf_create_batches_bulk_v2 <- function(job_id,
input_data,
batch_size = NULL,
verbose = FALSE){
job_status <- sf_get_job_bulk(job_id,
api_type = "Bulk 2.0",
verbose = verbose)
input_data <- sf_input_data_validation(operation = job_status$operation,
input_data)
# A request can provide CSV data that does not in total exceed 150 MB of base64
# encoded content. When job data is uploaded, it is converted to base64. This
# conversion can increase the data size by approximately 50%. To account for
# the base64 conversion increase, upload data that does not exceed 100 MB.
# https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/upload_job_data.htm
if(is.null(batch_size)){
data_size <- object.size(input_data)
numb_batches <- ceiling((as.numeric(data_size)/(1024^2))/100) # 100MB / (size converted to MB)
batch_size <- ceiling(nrow(input_data) / numb_batches)
}
row_num <- nrow(input_data)
batch_id <- (seq.int(row_num)-1) %/% batch_size
if(verbose) message("Submitting data in ", max(batch_id) + 1, " Batches")
message_flag <- unique(as.integer(quantile(0:max(batch_id), c(0.25,0.5,0.75,1))))
bulk_batches_url <- make_bulk_batches_url(job_id, api_type="Bulk 2.0")
resultset <- NULL
for(batch in seq(0, max(batch_id))){
if(verbose){
batch_msg_flg <- batch %in% message_flag
if(batch_msg_flg){
message(paste0("Processing Batch # ", head(batch, 1) + 1))
}
}
batched_data <- input_data[batch_id == batch, , drop=FALSE]
f <- tempfile()
sf_write_csv(batched_data, f)
httr_response <- rPUT(url = bulk_batches_url,
headers = c("Content-Type"="text/csv",
"Accept"="application/json"),
body = upload_file(path=f, type="text/csv"))
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers,
sprintf("Uploaded CSV file: %s", f))
}
catch_errors(httr_response)
}
# the batches will not start processing (move out of Queued state) until you signal "Upload Complete"
upload_details <- sf_upload_complete_bulk(job_id, verbose = verbose)
return(upload_details)
}
#' Checking the Status of a Batch in a Bulk API Job
#'
#' This function checks on and returns status information on an existing batch
#' which has already been submitted to Bulk API Job
#'
#' @importFrom httr content
#' @importFrom xml2 xml_ns_strip xml_find_all
#' @importFrom purrr map_df
#' @importFrom readr type_convert cols
#' @template job_id
#' @template api_type
#' @template verbose
#' @return A \code{tbl_df} of parameters defining the batch identified by the batch_id
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @note This is a legacy function used only with Bulk 1.0.
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk(operation = "query", object = "Account")
#' soql <- "SELECT Id, Name FROM Account LIMIT 10"
#' batch_query_info <- sf_submit_query_bulk(job_id = job_info$id, soql = soql)
#' submitted_batches <- sf_job_batches_bulk(job_id=batch_query_info$jobId)
#' job_close_ind <- sf_close_job_bulk(job_info$id)
#' sf_get_job_bulk(job_info$id)
#' }
#' @export
sf_job_batches_bulk <- function(job_id,
api_type = c("Bulk 1.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
bulk_batch_status_url <- make_bulk_batches_url(job_id, api_type = api_type)
httr_response <- rGET(url = bulk_batch_status_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
content_type <- httr_response$headers$`content-type`
if (grepl('xml', content_type)){
response_parsed <- content(httr_response, as='parsed', type="text/xml", encoding="UTF-8")
resultset <- response_parsed %>%
xml_ns_strip() %>%
xml_find_all('.//batchInfo') %>%
map_df(xml_nodeset_to_df) %>%
type_convert(col_types = cols())
} else if(grepl('json', content_type)){
response_parsed <- content(httr_response, as='parsed', type="application/json", encoding="UTF-8")
response_parsed <- response_parsed$batchInfo %>%
map(function(x) map(x, function(y) ifelse(is.null(y), NA, y)))
resultset <- safe_bind_rows(response_parsed) %>%
type_convert(col_types = cols())
} else {
message(sprintf("Unhandled content-type: %s", content_type))
resultset <- content(httr_response, as='parsed', encoding="UTF-8")
}
return(resultset)
}
#' Checking the Status of a Batch in a Bulk API Job
#'
#' This function checks on and returns status information on an existing batch
#' which has already been submitted to Bulk API Job
#'
#' @importFrom httr content
#' @importFrom xml2 xml_ns_strip xml_find_all
#' @importFrom purrr map_df
#' @importFrom readr type_convert cols
#' @template job_id
#' @template batch_id
#' @template api_type
#' @template verbose
#' @return A \code{tbl_df} of parameters defining the batch identified by the batch_id
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @note This is a legacy function used only with Bulk 1.0.
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk(operation = "query", object = "Account")
#' soql <- "SELECT Id, Name FROM Account LIMIT 10"
#' batch_query_info <- sf_submit_query_bulk(job_id = job_info$id, soql = soql)
#' batch_status <- sf_batch_status_bulk(job_id = batch_query_info$jobId,
#' batch_id = batch_query_info$id)
#' job_close_ind <- sf_close_job_bulk(job_info$id)
#' sf_get_job_bulk(job_info$id)
#' }
#' @export
sf_batch_status_bulk <- function(job_id, batch_id,
api_type = c("Bulk 1.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
bulk_batch_status_url <- make_bulk_batch_status_url(job_id, batch_id, api_type = api_type)
httr_response <- rGET(url = bulk_batch_status_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
content_type <- httr_response$headers$`content-type`
if (grepl('xml', content_type)){
response_parsed <- content(httr_response, as='parsed', type="text/xml", encoding="UTF-8")
resultset <- response_parsed %>%
xml_ns_strip() %>%
xml_find_all('//batchInfo') %>%
map_df(xml_nodeset_to_df) %>%
type_convert(col_types = cols())
} else if(grepl('json', content_type)){
response_parsed <- content(httr_response, as='parsed', type="application/json", encoding="UTF-8")
response_parsed <- response_parsed$batchInfo %>%
map(function(x) map(x, function(y) ifelse(is.null(y), NA, y)))
resultset <- safe_bind_rows(response_parsed) %>%
type_convert(col_types = cols())
} else {
message(sprintf("Unhandled content-type: %s", content_type))
resultset <- content(httr_response, as='parsed', encoding="UTF-8")
}
return(resultset)
}
#' Returning the Details of a Batch in a Bulk API Job
#'
#' This function returns detailed (row-by-row) information on an existing batch
#' which has already been submitted to Bulk API Job
#'
#' @importFrom readr read_csv type_convert cols col_character
#' @importFrom httr content
#' @importFrom XML xmlToList
#' @importFrom dplyr as_tibble
#' @template job_id
#' @template batch_id
#' @template api_type
#' @template verbose
#' @return A \code{tbl_df}, formatted by Salesforce, with information containing
#' the success or failure or certain rows in a submitted batch, unless the operation
#' was query, then it is a data.frame containing the result_id for retrieving the recordset.
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @note This is a legacy function used only with Bulk 1.0.
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk(operation = "query", object = "Account")
#' soql <- "SELECT Id, Name FROM Account LIMIT 10"
#' batch_query_info <- sf_submit_query_bulk(job_id = job_info$id, soql = soql)
#' batch_details <- sf_batch_details_bulk(job_id=batch_query_info$jobId,
#' batch_id=batch_query_info$id)
#' sf_close_job_bulk(job_info$id)
#' }
#' @export
sf_batch_details_bulk <- function(job_id, batch_id,
api_type = c("Bulk 1.0"),
verbose = FALSE){
api_type <- match.arg(api_type)
job_status <- sf_get_job_bulk(job_id, api_type = api_type, verbose = verbose)
bulk_batch_details_url <- make_bulk_batch_details_url(job_id, batch_id, api_type)
httr_response <- rGET(url = bulk_batch_details_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
content_type <- httr_response$headers$`content-type`
if(content_type == 'text/csv' | content_type == 'zip/csv'){
response_text <- content(httr_response, as="text", encoding="UTF-8")
# required to guess column types by all values in the column, not just first N
res <- read_csv(response_text, col_types = cols(.default = col_character()))
res <- type_convert(res, col_types = cols())
} else if(content_type == 'zip/xml'){
response_parsed <- content(httr_response, as="parsed", type="text/xml", encoding="UTF-8")
res <- response_parsed %>%
xml_ns_strip() %>%
xml_find_all('.//result') %>%
map_df(xml_nodeset_to_df)
} else if(content_type == 'zip/json'){
response_text <- content(httr_response, as="text", encoding="UTF-8")
res <- fromJSON(response_text)
} else if(grepl('xml', content_type)){
response_text <- content(httr_response, as="text", encoding="UTF-8")
res <- xmlToList(response_text)
} else {
message(sprintf("Unhandled content-type: %s", content_type))
res <- content(httr_response, as="parsed", encoding="UTF-8")
}
res <- res %>%
as_tibble() %>%
sf_reorder_cols() %>%
sf_guess_cols(TRUE)
return(res)
}
#' Returning the Details of a Bulk API Job
#'
#' This function returns detailed (row-level) information on a job
#' which has already been submitted completed (successfully or not).
#'
#' @template job_id
#' @template api_type
#' @template record_types
#' @template combine_record_types
#' @template verbose
#' @return A \code{tbl_df} or \code{list} of \code{tbl_df}, formatted by Salesforce,
#' with information containing the success or failure or certain rows in a submitted job
#' @references \url{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}
#' @note With Bulk 2.0 the order of records in the response is not guaranteed to
#' match the ordering of records in the original job data.
#' @examples
#' \dontrun{
#' job_info <- sf_create_job_bulk('insert', 'Account')
#' input_data <- tibble(Name=c("Test Account 1", "Test Account 2"))
#' batches_result <- sf_create_batches_bulk(job_info$id, input_data)
#' # pause a few seconds for operation to finish. Wait longer if job is not complete.
#' Sys.sleep(3)
#' # check status using - sf_get_job_bulk(job_info$id)
#' job_record_details <- sf_get_job_records_bulk(job_id=job_info$id)
#' }
#' @export
sf_get_job_records_bulk <- function(job_id,
api_type = c("Bulk 1.0", "Bulk 2.0"),
record_types = c("successfulResults",
"failedResults",
"unprocessedRecords"),
combine_record_types = TRUE,
verbose = FALSE){
api_type <- match.arg(api_type)
if(api_type == "Bulk 1.0"){
batch_records <- sf_get_job_records_bulk_v1(job_id, verbose = verbose)
} else if(api_type == "Bulk 2.0"){
batch_records <- sf_get_job_records_bulk_v2(job_id,
record_types = record_types,
combine_record_types = combine_record_types,
verbose = verbose)
} else {
catch_unknown_api(api_type, c("Bulk 1.0", "Bulk 2.0"))
}
return(batch_records)
}
sf_get_job_records_bulk_v1 <- function(job_id, verbose = FALSE){
batches_info <- sf_job_batches_bulk(job_id, api_type = "Bulk 1.0", verbose = verbose)
# loop through all the batches
resultset <- NULL
for(i in 1:nrow(batches_info)){
this_batch_resultset <- sf_batch_details_bulk(job_id = job_id,
batch_id = batches_info$id[i],
api_type = "Bulk 1.0",
verbose = verbose)
resultset <- safe_bind_rows(list(resultset, this_batch_resultset))
}
return(resultset)
}
#' @importFrom readr read_csv type_convert cols col_character
#' @importFrom httr content
sf_get_job_records_bulk_v2 <- function(job_id,
record_types = c("successfulResults",
"failedResults",
"unprocessedRecords"),
combine_record_types = TRUE,
verbose = FALSE){
record_types <- match.arg(record_types, several.ok = TRUE)
records <- list()
for(r in record_types){
bulk_job_records_url <- make_bulk_job_records_url(job_id, record_type = r, api_type = "Bulk 2.0")
httr_response <- rGET(url = bulk_job_records_url)
if(verbose){
make_verbose_httr_message(httr_response$request$method,
httr_response$request$url,
httr_response$request$headers)
}
catch_errors(httr_response)
response_text <- content(httr_response, as="text", encoding="UTF-8")
content_type <- httr_response$headers$`content-type`
if(grepl('text/csv', content_type)) {
# required to guess column types by all values in the column, not just first N
res <- read_csv(response_text, col_types = cols(.default = col_character()))
res <- type_convert(res, col_types = cols())
} else {
message(sprintf("Unhandled content-type: %s", content_type))
res <- content(httr_response, as="parsed", encoding="UTF-8")
}
records[[r]] <- res
}
if(combine_record_types){
# Before combining, try to match the data types of each column in the dataset
# with recordsets having 0 records taking on the datatype of the one with
# the most records. Otherwise, we will receive an error such as:
# Can't combine `test_number__c` <double> and `test_number__c` <character>.
# First, determine the column datatypes from the data.frame with most rows
base_class_df <- records[[head(which.max(sapply(records, nrow)), 1)]]
# Second, convert the datatypes for any data.frames with zero rows
# From StackOverflow post here: https://stackoverflow.com/a/47800157/5258043
# distributed under the CC BY-SA 3.0 license terms.
for (i in 1:length(records)){
if(nrow(records[[i]]) == 0){
common <- names(records[[i]])[names(records[[i]]) %in% names(base_class_df)]
records[[i]][common] <- lapply(common, function(x) {
match.fun(paste0("as.", class(base_class_df[[x]])[1]))(records[[i]][[x]])
})
}
}
# Third, now bind them together
res <- safe_bind_rows(records)
} else {
res <- records
}
return(res)
}
#' Run Bulk Operation
#'
#' @description
#' `r lifecycle::badge("stable")`
#'
#' This function is a convenience wrapper for submitting bulk API jobs
#'
#' @importFrom dplyr is.tbl
#' @param input_data \code{named vector}, \code{matrix}, \code{data.frame}, or
#' \code{tbl_df}; data can be coerced into CSV file for submitting as batch request
#' @template object_name
#' @param operation \code{character}; string defining the type of operation being performed
#' @template external_id_fieldname
#' @template guess_types
#' @template api_type
#' @template batch_size
#' @template interval_seconds
#' @template max_attempts
#' @param wait_for_results \code{logical}; indicating whether to wait for the operation to complete
#' so that the batch results of individual records can be obtained
#' @template record_types
#' @template combine_record_types
#' @template control
#' @param ... other arguments passed on to \code{\link{sf_control}} or \code{\link{sf_create_job_bulk}}
#' to specify the \code{content_type}, \code{concurrency_mode}, and/or \code{column_delimiter}.
#' @template verbose
#' @return A \code{tbl_df} of the results of the bulk job
#' @note With Bulk 2.0 the order of records in the response is not guaranteed to
#' match the ordering of records in the original job data.
#' @seealso \href{https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch}{Salesforce Documentation}
#' @examples
#' \dontrun{
#' n <- 20
#' new_contacts <- tibble(FirstName = rep("Test", n),
#' LastName = paste0("Contact", 1:n))
#' # insert new records into the Contact object
#' inserts <- sf_bulk_operation(input_data = new_contacts,
#' object_name = "Contact",
#' operation = "insert")
#' }
#' @export
sf_run_bulk_operation <- function(input_data,
object_name,
operation = c("insert", "delete", "upsert",
"update", "hardDelete"),
external_id_fieldname = NULL,
guess_types = TRUE,
api_type = c("Bulk 1.0", "Bulk 2.0"),
batch_size = NULL,
interval_seconds = 3,
max_attempts = 200,
wait_for_results = TRUE,
record_types = c("successfulResults",
"failedResults",
"unprocessedRecords"),
combine_record_types = TRUE,
control = list(...), ...,
verbose = FALSE){
stopifnot(!missing(operation))
api_type <- match.arg(api_type)
# this code is redundant because it exists in the sf_create, sf_update, etc.
# wrappers, but it is possible that some people are creating jobs with this
# function instead of the others, so make sure that we do it here as well. It
# should be a relatively small performance hit given its a bulk operation
# determine how to pass along the control args
control_args <- return_matching_controls(control)
control_args$api_type <- api_type
control_args$operation <- operation
job_info <- sf_create_job_bulk(operation,
object_name = object_name,
external_id_fieldname = external_id_fieldname,
api_type = api_type,
control = control_args, ...,
verbose = verbose)
batches_info <- sf_create_batches_bulk(job_id = job_info$id,
input_data,
batch_size = batch_size,
api_type = api_type,
verbose = verbose)
if(wait_for_results){
status_complete <- FALSE
z <- 1
Sys.sleep(interval_seconds)
while (z < max_attempts & !status_complete){
if (verbose){
if(z %% 5 == 0){
message(paste0("Attempt to retrieve records #", z))
}
}
Sys.sleep(interval_seconds)
job_status <- sf_get_job_bulk(job_info$id, api_type = api_type, verbose = verbose)
if(api_type == "Bulk 1.0"){
if(job_status$state == 'Failed' | job_status$state == 'Aborted'){
stop(job_status$stateMessage) # what does this do if job is aborted?
} else {
# check that all batches have been completed before declaring the job done
job_batches <- sf_job_batches_bulk(job_info$id, api_type=api_type, verbose=verbose)
if(all(job_batches$state == "Completed")){
status_complete <- TRUE
} else {
# continue checking the status until done or max attempts
z <- z + 1
}
}
} else if(api_type == "Bulk 2.0"){
if(job_status$state == 'Failed'){
stop(job_status$errorMessage)
} else if(job_status$state == "JobComplete"){
status_complete <- TRUE
} else {
# continue checking the status until done or max attempts
z <- z + 1
}
} else {
catch_unknown_api(api_type, c("Bulk 1.0", "Bulk 2.0"))
}
}
if (!status_complete) {
message("Function's Time Limit Exceeded. Aborting Job Now")
res <- sf_abort_job_bulk(job_info$id, api_type = api_type, verbose = verbose)
} else {
res <- sf_get_job_records_bulk(
job_info$id,
api_type = api_type,
record_types = record_types,
combine_record_types = combine_record_types,
verbose = verbose
)
# For Bulk 2.0 jobs -> INVALIDJOBSTATE: Closing already Completed Job not allowed
if(api_type == "Bulk 1.0"){
close_job_info <- sf_close_job_bulk(job_info$id, api_type = api_type, verbose = verbose)
}
}
} else {
res <- job_info # at least return the job info if not waiting for records
}
if(is.tbl(res)){
res <- res %>%
sf_reorder_cols() %>%
sf_guess_cols(guess_types)
}
return(res)
}
# allows for the inclusion of sf_run version of the function for a consistent
# interface as other "run" functions provided by the package which are a wrapper
# around more complex data processing tasks in Salesforce
#' @export
#' @rdname sf_run_bulk_operation
sf_bulk_operation <- sf_run_bulk_operation
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.