R/bq-perform.R

Defines functions bq_perform_copy bq_perform_query_dry_run bq_perform_query bq_perform_load export_json bq_perform_upload bq_perform_extract

Documented in bq_perform_copy bq_perform_extract bq_perform_load bq_perform_query bq_perform_query_dry_run bq_perform_upload

#' BigQuery jobs: perform a job
#'
#' @description
#' These functions are low-level functions designed to be used by experts.
#' Each of these low-level functions is paired with a high-level function that
#' you should use instead:
#'
#' * `bq_perform_copy()`:    [bq_table_copy()].
#' * `bq_perform_query()`:   [bq_dataset_query()], [bq_project_query()].
#' * `bq_perform_upload()`:  [bq_table_upload()].
#' * `bq_perform_load()`:    [bq_table_load()].
#' * `bq_perform_extract()`: [bq_table_save()].
#'
#' @section Google BigQuery API documentation:
#' * [jobs](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs)
#'
#' Additional information at:
#' * [exporting data](https://cloud.google.com/bigquery/docs/exporting-data)
#' * [loading data](https://cloud.google.com/bigquery/docs/loading-data)
#' * [writing queries](https://cloud.google.com/bigquery/docs/writing-results)
#' * [copying a table](https://cloud.google.com/bigquery/docs/managing-tables#copy-table)
#'
#' @return A [bq_job].
#' @keywords internal
#' @examplesIf bq_testable()
#' ds <- bq_test_dataset()
#' bq_mtcars <- bq_table(ds, "mtcars")
#' job <- bq_perform_upload(bq_mtcars, mtcars)
#' bq_table_exists(bq_mtcars)
#'
#' bq_job_wait(job)
#' bq_table_exists(bq_mtcars)
#' head(bq_table_download(bq_mtcars))
#' @name api-perform
NULL

#' @export
#' @name api-perform
#' @param x A [bq_table]
#' @param destination_uris A character vector of fully-qualified Google Cloud
#'   Storage URIs where the extracted table should be written. Can export
#'   up to 1 Gb of data per file. Use a wild card URI (e.g.
#'   `gs://[YOUR_BUCKET]/file-name-*.json`) to automatically create any
#'   number of files.
#' @param destination_format The exported file format. Possible values
#'   include "CSV", "NEWLINE_DELIMITED_JSON" and "AVRO". Tables with nested or
#'   repeated fields cannot be exported as CSV.
#' @param compression The compression type to use for exported files. Possible
#'   values include "GZIP", "DEFLATE", "SNAPPY", and "NONE". "DEFLATE" and
#'   "SNAPPY" are only supported for Avro.
#' @param ... Additional arguments passed on to the underlying API call.
#'   snake_case names are automatically converted to camelCase.
#' @param print_header Whether to print out a header row in the results.
#' @param billing Identifier of project to bill.
bq_perform_extract <- function(x,
                               destination_uris,
                               destination_format = "NEWLINE_DELIMITED_JSON",
                               compression = "NONE",
                               ...,
                               print_header = TRUE,
                               billing = x$project) {

  x <- as_bq_table(x)
  destination_uris <- as.character(destination_uris) # for gs_object
  check_string(destination_format)
  check_string(compression)
  check_bool(print_header)
  check_string(billing)

  url <- bq_path(billing, jobs = "")
  body <- list(
    configuration = list(
      extract = list(
        sourceTable = tableReference(x),
        destinationUris = as.list(destination_uris),
        destinationFormat = unbox(destination_format),
        compression = unbox(compression),
        printHeader = unbox(print_header)
      )
    )
  )

  res <- bq_post(
    url,
    body = bq_body(body, ...),
    query = list(field = "jobReference")
  )
  as_bq_job(res$jobReference)
}

#' @export
#' @name api-perform
#' @param values Data frame of values to insert.
#' @param create_disposition Specifies whether the job is allowed to create
#'   new tables.
#'
#'   The following values are supported:
#'   * "CREATE_IF_NEEDED": If the table does not exist, BigQuery creates the
#'     table.
#'   * "CREATE_NEVER": The table must already exist. If it does not, a
#'     'notFound' error is returned in the job result.
#' @param write_disposition Specifies the action that occurs if the
#'   destination table already exists. The following values are supported:
#'
#'   * "WRITE_TRUNCATE": If the table already exists, BigQuery overwrites the
#'     table data.
#'   * "WRITE_APPEND": If the table already exists, BigQuery appends the data
#'     to the table.
#'   * "WRITE_EMPTY": If the table already exists and contains data, a
#'     'duplicate' error is returned in the job result.
bq_perform_upload <- function(x, values,
                              fields = NULL,
                              create_disposition = "CREATE_IF_NEEDED",
                              write_disposition = "WRITE_EMPTY",
                              ...,
                              billing = x$project
                              ) {

  x <- as_bq_table(x)
  if (!is.data.frame(values)) {
    cli::cli_abort("{.arg values} must be a data frame.")
  }
  fields <- as_bq_fields(fields)
  check_string(create_disposition)
  check_string(write_disposition)
  check_string(billing)

  load <- list(
    sourceFormat = unbox("NEWLINE_DELIMITED_JSON"),
    destinationTable = tableReference(x),
    createDisposition = unbox(create_disposition),
    writeDisposition = unbox(write_disposition)
  )

  if (!is.null(fields)) {
    load$schema <- list(fields = as_json(fields))
  }
  if (!bq_table_exists(x)) {
    load$autodetect <- unbox(TRUE)
  }

  config <- list(configuration = list(load = load))
  config <- bq_body(config, ...)
  config_part <- part(
    c("Content-type" = "application/json; charset=UTF-8"),
    jsonlite::toJSON(config, pretty = TRUE)
  )

  data_part <- part(
    c("Content-type" = "application/json; charset=UTF-8"),
    export_json(values)
  )

  url <- bq_path(billing, jobs = "")
  res <- bq_upload(
    url,
    parts = c(config_part, data_part),
    query = list(fields = "jobReference")
  )
  as_bq_job(res$jobReference)
}

# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#details_of_loading_json_data
export_json <- function(values) {
  # Eliminate row names
  rownames(values) <- NULL

  # Convert times to canonical format
  is_time <- vapply(values, function(x) inherits(x, "POSIXt"), logical(1))
  values[is_time] <- lapply(values[is_time], format, "%Y-%m-%d %H:%M:%S")

  # Convert wk_wkt to text
  is_wk <- vapply(values, function(x) inherits(x, "wk_vctr"), logical(1))
  values[is_wk] <- lapply(values[is_wk], as.character)

  # Unbox blobs
  is_blob <- vapply(values, function(x) inherits(x, "blob"), logical(1))
  values[is_blob] <- lapply(values[is_blob], function(x) {
    vapply(x, jsonlite::base64_enc, character(1))
  })

  con <- rawConnection(raw(0), "r+")
  defer(close(con))
  jsonlite::stream_out(values, con, verbose = FALSE, na = "null")

  rawToChar(rawConnectionValue(con))
}

#' @export
#' @name api-perform
#' @param source_uris The fully-qualified URIs that point to your data in
#'   Google Cloud.
#'
#'   For Google Cloud Storage URIs: Each URI can contain one
#'   `'*'`` wildcard character and it must come after the 'bucket' name.
#'   Size limits related to load jobs apply to external data sources.
#'
#'   For Google Cloud Bigtable URIs: Exactly one URI can be specified and
#'   it has be a fully specified and valid HTTPS URL for a Google Cloud
#'    Bigtable table. For Google Cloud Datastore backups: Exactly one URI
#'  can be specified. Also, the '*' wildcard character is not allowed.
#' @param source_format The format of the data files:
#'   * For CSV files, specify "CSV".
#'   * For datastore backups, specify "DATASTORE_BACKUP".
#'   * For newline-delimited JSON, specify "NEWLINE_DELIMITED_JSON".
#'   * For Avro, specify "AVRO".
#'   * For parquet, specify "PARQUET".
#'   * For orc, specify "ORC".
#' @param fields A [bq_fields] specification, or something coercible to it
#'   (like a data frame). Leave as `NULL` to allow BigQuery to auto-detect
#'   the fields.
#' @param nskip For `source_format = "CSV"`, the number of header rows to skip.
bq_perform_load <- function(x,
                            source_uris,
                            billing = x$project,
                            source_format = "NEWLINE_DELIMITED_JSON",
                            fields = NULL,
                            nskip = 0,
                            create_disposition = "CREATE_IF_NEEDED",
                            write_disposition = "WRITE_EMPTY",
                            ...
                            ) {
  x <- as_bq_table(x)
  source_uris <- as.character(source_uris)
  check_string(billing)
  check_string(source_format)
  check_number_decimal(nskip, min = 0)
  check_string(create_disposition)
  check_string(write_disposition)

  load <- list(
    sourceUris = as.list(source_uris),
    sourceFormat = unbox(source_format),
    destinationTable = tableReference(x),
    createDisposition = unbox(create_disposition),
    writeDisposition = unbox(write_disposition)
  )

  if (source_format == "CSV") {
    load$skipLeadingRows <- nskip
  }

  if (!is.null(fields)) {
    fields <- as_bq_fields(fields)
    load$schema <- list(fields = as_json(fields))
  } else {
    load$autodetect <- TRUE
  }

  body <- list(configuration = list(load = load))

  url <- bq_path(billing, jobs = "")
  res <- bq_post(
    url,
    body = bq_body(body, ...),
    query = list(fields = "jobReference")
  )
  as_bq_job(res$jobReference)
}

#' @export
#' @rdname api-perform
#' @param query SQL query string.
#' @param parameters Named list of parameters match to query parameters.
#'   Parameter `x` will be matched to placeholder `@x`.
#'
#'   Generally, you can supply R vectors and they will be automatically
#'   converted to the correct type. If you need greater control, you can call
#'   [bq_param_scalar()] or [bq_param_array()] explicitly.
#'
#'   See <https://cloud.google.com/bigquery/docs/parameterized-queries>
#'   for more details.
#' @param destination_table A [bq_table] where results should be stored.
#'   If not supplied, results will be saved to a temporary table that lives
#'   in a special dataset. You must supply this parameter for large
#'   queries (> 128 MB compressed).
#' @param priority Specifies a priority for the query. Possible values include
#'   "INTERACTIVE" and "BATCH". Batch queries do not start immediately,
#'   but are not rate-limited in the same way as interactive queries.
#' @param default_dataset A [bq_dataset] used to automatically qualify table names.
#' @param use_legacy_sql If `TRUE` will use BigQuery's legacy SQL format.
bq_perform_query <- function(query, billing,
                             ...,
                             parameters = NULL,
                             destination_table = NULL,
                             default_dataset = NULL,
                             create_disposition = "CREATE_IF_NEEDED",
                             write_disposition = "WRITE_EMPTY",
                             use_legacy_sql = FALSE,
                             priority = "INTERACTIVE"
                             ) {

  query <- as_query(query)
  check_string(billing)

  check_string(create_disposition)
  check_string(write_disposition)
  check_bool(use_legacy_sql)
  check_string(priority)

  query <- list(
    query = unbox(query),
    useLegacySql = unbox(use_legacy_sql),
    priority = unbox(priority)
  )

  if (length(parameters) > 0) {
    parameters <- as_bq_params(parameters)
    query$queryParameters <- as_json(parameters)
  }

  if (!is.null(destination_table)) {
    query$destinationTable <- tableReference(destination_table)
    query$createDisposition <- unbox(create_disposition)
    query$writeDisposition <- unbox(write_disposition)
    if (use_legacy_sql)
      query$allowLargeResults <- unbox(TRUE)
  }

  if (!is.null(default_dataset)) {
    query$defaultDataset <- datasetReference(default_dataset)
  }

  url <- bq_path(billing, jobs = "")
  body <- list(configuration = list(query = query))

  res <- bq_post(
    url,
    body = bq_body(body, ...),
    query = list(fields = "jobReference")
  )
  as_bq_job(res$jobReference)
}

#' @export
#' @rdname api-perform
bq_perform_query_dry_run <- function(query, billing,
                                     ...,
                                     default_dataset = NULL,
                                     parameters = NULL,
                                     use_legacy_sql = FALSE) {

  check_string(query)
  check_string(billing)
  check_bool(use_legacy_sql)

  query <- list(
    query = unbox(query),
    useLegacySql = unbox(use_legacy_sql)
  )
  if (!is.null(parameters)) {
    parameters <- as_bq_params(parameters)
    query$queryParameters <- as_json(parameters)
  }
  if (!is.null(default_dataset)) {
    query$defaultDataset <- datasetReference(default_dataset)
  }

  url <- bq_path(billing, jobs = "")
  body <- list(configuration = list(query = query, dryRun = unbox(TRUE)))

  res <- bq_post(
    url,
    body = bq_body(body, ...),
    query = list(fields = "statistics")
  )
  bytes <- as.numeric(res$statistics$query$totalBytesProcessed)
  structure(bytes, class = "bq_bytes")
}

#' @export
#' @rdname api-perform
bq_perform_copy <- function(src, dest,
                            create_disposition = "CREATE_IF_NEEDED",
                            write_disposition = "WRITE_EMPTY",
                            ...,
                            billing = NULL) {

  billing <- billing %||% dest$project
  url <- bq_path(billing, jobs = "")

  body <- list(
    configuration = list(
      copy = list(
        sourceTable = tableReference(src),
        destinationTable = tableReference(dest),
        createDisposition = unbox(create_disposition),
        writeDisposition = unbox(write_disposition)
      )
    )
  )

  res <- bq_post(
    url,
    body = bq_body(body, ...),
    query = list(fields = "jobReference")
  )
  as_bq_job(res$jobReference)
}
hadley/bigrquery documentation built on April 18, 2024, 7:48 p.m.