R/dbi-connection.R

Defines functions as_bq_table.BigQueryConnection as_bq_dataset.BigQueryConnection dbRemoveTable_bq dbListFields_bq dbExistsTable_bq dbReadTable_bq dbCreateTable_bq dbAppendTable_bq dbWriteTable_bq BigQueryConnection

#' @include dbi-driver.R
NULL

BigQueryConnection <- function(project,
                               dataset,
                               billing,
                               page_size = 1e4,
                               quiet = NA,
                               use_legacy_sql = FALSE,
                               bigint = c("integer", "integer64", "numeric", "character")) {

  connection_capture()

  new("BigQueryConnection",
    project = project,
    dataset = dataset,
    billing = billing,
    page_size = as.integer(page_size),
    quiet = quiet,
    use_legacy_sql = use_legacy_sql,
    bigint = match.arg(bigint)
  )
}

#' @rdname DBI
#' @export
setClass(
  "BigQueryConnection",
  contains = "DBIConnection",
  slots = list(
    project = "character",
    dataset = "ANY",
    billing = "character",
    use_legacy_sql = "logical",
    page_size = "integer",
    quiet = "logical",
    bigint = "character"
  )
)

#' @rdname DBI
#' @inheritParams methods::show
#' @export
setMethod(
  "show", "BigQueryConnection",
  function(object) {
    cat_line("<BigQueryConnection>")

    if (!is.null(object@dataset)) {
      cat_line("  Dataset: ", object@project, ".", object@dataset)
    }
    cat_line("  Billing: ", object@billing)

  })

#' @rdname DBI
#' @inheritParams DBI::dbIsValid
#' @export
setMethod(
  "dbIsValid", "BigQueryConnection",
  function(dbObj, ...) {
    TRUE
  })

#' @rdname DBI
#' @inheritParams DBI::dbDisconnect
#' @export
setMethod(
  "dbDisconnect", "BigQueryConnection",
  function(conn, ...) {
    on_connection_closed(conn)
    invisible(TRUE)
  })

#' @rdname DBI
#' @inheritParams DBI::dbSendQuery
#' @export
setMethod(
  "dbSendQuery",
  c("BigQueryConnection", "character"),
  function(conn, statement, ..., params = NULL) {
    BigQueryResult(conn, statement, params = params, ...)
  }
)

#' @rdname DBI
#' @inheritParams DBI::dbSendQuery
#' @export
setMethod("dbExecute", c("BigQueryConnection", "character"), function(conn, statement, ...) {
  ds <- if (!is.null(conn@dataset)) as_bq_dataset(conn)

  job <- bq_perform_query(statement,
    billing = conn@billing,
    default_dataset = ds,
    quiet = conn@quiet,
    ...
  )
  bq_job_wait(job, quiet = conn@quiet)

  meta <- bq_job_meta(job, "statistics(query(numDmlAffectedRows))")
  as.numeric(meta$statistics$query$numDmlAffectedRows %||% 0)
})


#' @rdname DBI
#' @inheritParams DBI::dbQuoteString
#' @export
setMethod(
  "dbQuoteString", c("BigQueryConnection", "character"),
  function(conn, x, ...) {
    if (length(x) == 0) {
      return(SQL(character()))
    }

    out <- encodeString(x, na.encode = FALSE, quote = "'")
    out[is.na(x)] <- "NULL"
    SQL(out, names = names(x))
  })

#' @rdname DBI
#' @export
setMethod(
  "dbQuoteString", c("BigQueryConnection", "SQL"),
  function(conn, x, ...) {
    x
  })

#' @rdname DBI
#' @inheritParams DBI::dbQuoteIdentifier
#' @export
setMethod(
  "dbQuoteIdentifier", c("BigQueryConnection", "character"),
  function(conn, x, ...) {
    if (length(x) == 0) {
      return(SQL(character()))
    }

    if (any(is.na(x))) {
      cli::cli_abort("{.arg x} must not contain missing values.")
    }

    if (conn@use_legacy_sql) {
      out <- paste0("[", x, "]")
    } else {
      out <- encodeString(x, quote = "`")
    }

    SQL(out, names = names(x))
  })

#' @rdname DBI
#' @export
setMethod(
  "dbQuoteIdentifier", c("BigQueryConnection", "SQL"),
  function(conn, x, ...) {
    x
  }
)

#' @rdname DBI
#' @export
setMethod(
  "dbQuoteLiteral", c("BigQueryConnection", "logical"),
  function(conn, x, ...) {
    x <- as.character(x)
    x[is.na(x)] <- "NULL"
    SQL(x, names = names(x))
  }
)

#' @rdname DBI
#' @inheritParams DBI::dbDataType
#' @export
setMethod(
  "dbDataType", "BigQueryConnection",
  function(dbObj, obj, ...) {
    data_type(obj)
  }
)

dbWriteTable_bq <- function(conn,
                            name,
                            value,
                            ...,
                            overwrite = FALSE,
                            append = FALSE,
                            field.types = NULL,
                            temporary = FALSE,
                            row.names = NA) {

  check_bool(overwrite)
  check_bool(append)

  if (!is.null(field.types)) {
    cli::cli_abort(
      "{.arg field.types} not supported by bigrquery.",
      call = quote(DBI::dbWriteTable())
    )
  }
  if (!identical(temporary, FALSE)) {
    cli::cli_abort(
      "{.code temporary = FALSE} not supported by bigrquery.",
      call = quote(DBI::dbWriteTable())
    )
  }

  if (append) {
    create_disposition <- "CREATE_NEVER"
    write_disposition <- "WRITE_APPEND"
  } else {
    create_disposition <- "CREATE_IF_NEEDED"
    write_disposition <- if (overwrite) "WRITE_TRUNCATE" else "WRITE_EMPTY"
  }
  tb <- as_bq_table(conn, name)

  bq_table_upload(
    tb,
    value,
    create_disposition = create_disposition,
    write_disposition = write_disposition,
    billing = conn@billing,
    ...
  )
  invisible(TRUE)
}

#' @rdname DBI
#' @inheritParams DBI::dbWriteTable
#' @param row.names A logical specifying whether the `row.names` should be
#'   output to the output DBMS table; if `TRUE`, an extra field whose name
#'   will be whatever the R identifier `"row.names"` maps to the DBMS (see
#'   [DBI::make.db.names()]). If `NA` will add rows names if
#'   they are characters, otherwise will ignore.
#' @param overwrite a logical specifying whether to overwrite an existing table
#'   or not. Its default is `FALSE`.
#' @param append a logical specifying whether to append to an existing table
#'   in the DBMS.  Its default is `FALSE`.
#' @param field.types,temporary Ignored. Included for compatibility with
#'   generic.
#' @export
setMethod(
  "dbWriteTable",
  c("BigQueryConnection", "character", "data.frame"),
  dbWriteTable_bq
)

#' @rdname DBI
#' @export
setMethod(
  "dbWriteTable",
  c("BigQueryConnection", "Id", "data.frame"),
  dbWriteTable_bq
)

#' @rdname DBI
#' @export
setMethod(
  "dbWriteTable",
  c("BigQueryConnection", "AsIs", "data.frame"),
  dbWriteTable_bq
)


dbAppendTable_bq <- function(conn, name, value, ..., row.names = NULL) {
  tb <- as_bq_table(conn, name)

  bq_table_upload(tb, value,
    create_disposition = "CREATE_NEVER",
    write_disposition = "WRITE_APPEND",
    ...
  )
  on_connection_updated(conn, toString(tb))

  invisible(TRUE)
}

#' @inheritParams DBI::dbAppendTable
#' @rdname DBI
#' @export
setMethod("dbAppendTable", c("BigQueryConnection", "character", "data.frame"), dbAppendTable_bq)

#' @rdname DBI
#' @export
setMethod("dbAppendTable", c("BigQueryConnection", "Id", "data.frame"), dbAppendTable_bq)

#' @rdname DBI
#' @export
setMethod("dbAppendTable", c("BigQueryConnection", "AsIs", "data.frame"), dbAppendTable_bq)

dbCreateTable_bq <- function(conn,
                             name,
                             fields,
                             ...,
                             row.names = NULL,
                             temporary = FALSE) {
  if (!identical(temporary, FALSE)) {
    cli::cli_abort(
      "{.code temporary = FALSE} not supported by bigrquery.",
      call = quote(DBI::dbCreateTable())
    )
  }

  tb <- as_bq_table(conn, name)
  bq_table_create(tb, fields)
  on_connection_updated(conn, toString(tb))

  invisible(TRUE)
}

#' @inheritParams DBI::dbCreateTable
#' @rdname DBI
#' @export
setMethod("dbCreateTable", "BigQueryConnection", dbCreateTable_bq)

#' @rdname DBI
#' @export
setMethod("dbCreateTable", "BigQueryConnection", dbCreateTable_bq)

dbReadTable_bq <- function(conn, name, ...) {
  tb <- as_bq_table(conn, name)
  bq_table_download(tb, ...)
}

#' @rdname DBI
#' @inheritParams DBI::dbReadTable
#' @export
setMethod("dbReadTable", c("BigQueryConnection", "character"), dbReadTable_bq)

#' @rdname DBI
#' @export
setMethod("dbReadTable", c("BigQueryConnection", "Id"), dbReadTable_bq)

#' @rdname DBI
#' @export
setMethod("dbReadTable", c("BigQueryConnection", "AsIs"), dbReadTable_bq)

#' @rdname DBI
#' @inheritParams DBI::dbListTables
#' @export
setMethod(
  "dbListTables", "BigQueryConnection",
  function(conn, ...) {
    if (is.null(conn@dataset)) {
      cli::cli_abort("Can't list tables without a connection `dataset`.")
    }
    ds <- bq_dataset(conn@project, conn@dataset)

    tbs <- bq_dataset_tables(ds, ...)
    map_chr(tbs, function(x) x$table)
  })

dbExistsTable_bq <- function(conn, name, ...) {
  tb <- as_bq_table(conn, name)
  bq_table_exists(tb)
}
#' @inheritParams DBI::dbExistsTable
#' @rdname DBI
#' @export
setMethod("dbExistsTable", c("BigQueryConnection", "character"), dbExistsTable_bq)

#' @rdname DBI
#' @export
setMethod("dbExistsTable", c("BigQueryConnection", "Id"), dbExistsTable_bq)

#' @rdname DBI
#' @export
setMethod("dbExistsTable", c("BigQueryConnection", "AsIs"), dbExistsTable_bq)

dbListFields_bq <- function(conn, name, ...) {
  tb <- as_bq_table(conn, name)
  flds <- bq_table_fields(tb)
  map_chr(flds, function(x) x$name)
}

#' @inheritParams DBI::dbListFields
#' @rdname DBI
#' @export
setMethod("dbListFields", c("BigQueryConnection", "character"), dbListFields_bq)

#' @rdname DBI
#' @export
setMethod("dbListFields", c("BigQueryConnection", "Id"), dbListFields_bq)

#' @rdname DBI
#' @export
setMethod("dbListFields", c("BigQueryConnection", "AsIs"), dbListFields_bq)

dbRemoveTable_bq <- function(conn, name, ...) {
  tb <- as_bq_table(conn, name)
  bq_table_delete(tb)
  on_connection_updated(conn, toString(tb))
  invisible(TRUE)
}

#' @inheritParams DBI::dbRemoveTable
#' @rdname DBI
#' @export
setMethod("dbRemoveTable", c("BigQueryConnection", "character"), dbRemoveTable_bq)

#' @rdname DBI
#' @export
setMethod("dbRemoveTable", c("BigQueryConnection", "Id"), dbRemoveTable_bq)

#' @rdname DBI
#' @export
setMethod("dbRemoveTable", c("BigQueryConnection", "AsIs"), dbRemoveTable_bq)

# nocov start
#' @rdname DBI
#' @inheritParams DBI::dbGetInfo
#' @export
setMethod(
  "dbGetInfo", "BigQueryConnection",
  function(dbObj, ...) {
    list(
      db.version = NA,
      dbname = paste0(c(dbObj@project, dbObj@dataset), collapse = "."),
      username = NA,
      host = NA,
      port = NA
    )
  })

#' @rdname DBI
#' @inheritParams DBI::dbBegin
#' @export
setMethod(
  "dbBegin", "BigQueryConnection",
  function(conn, ...) {
    testthat::skip("Not yet implemented: dbBegin(Connection)")
  })

#' @rdname DBI
#' @inheritParams DBI::dbCommit
#' @export
setMethod(
  "dbCommit", "BigQueryConnection",
  function(conn, ...) {
    testthat::skip("Not yet implemented: dbCommit(Connection)")
  })

#' @rdname DBI
#' @inheritParams DBI::dbRollback
#' @export
setMethod(
  "dbRollback", "BigQueryConnection",
  function(conn, ...) {
    testthat::skip("Not yet implemented: dbRollback(Connection)")
  })
# nocov end


# Convert to bq objects ---------------------------------------------------

#' @export
as_bq_dataset.BigQueryConnection <- function(x, ..., error_arg, error_call) {
  bq_dataset(x@project, x@dataset)
}


#' @export
as_bq_table.BigQueryConnection <- function(x, name, ...) {
  if (inherits(name, "dbplyr_table_path")) { # dbplyr 2.5.0
    pieces <- utils::getFromNamespace("table_path_components", "dbplyr")(name, x)[[1]]
  } else if (inherits(name, "dbplyr_table_ident")) { # dbplyr 2.4.0
    name <- unclass(name)
    pieces <- c(name$catalog, name$schema, name$table)
    pieces <- pieces[!is.na(pieces)]

    if (length(pieces) == 1) {
      pieces <- strsplit(pieces, ".", fixed = TRUE)[[1]]
    }
  } else if (is(name, "Id")) {
    pieces <- unname(name@name)
  } else if (is.character(name) && length(name) == 1) {
    # Technically incorrect according to the DBI spec, because it should
    # automatically quote the name; but too high risk to change now
    pieces <- strsplit(name, ".", fixed = TRUE)[[1]]
  } else {
    cli::cli_abort("{.arg name} must be a string or a dbplyr_table_ident.")
  }

  if (length(pieces) > 3) {
    cli::cli_abort("{.arg name} ({.str {name}}) must have 1-3 components.")
  }
  if (length(pieces) == 1 && is.null(x@dataset)) {
    cli::cli_abort(
      "{.arg name} ({.str {name}}) must have 2 or 3 components if the connection doesn't have a dataset."
    )
  }

  switch(length(pieces),
    bq_table(x@project, x@dataset, pieces[[1]]),
    bq_table(x@project, pieces[[1]], pieces[[2]]),
    bq_table(pieces[[1]], pieces[[2]], pieces[[3]])
  )
}
rstats-db/bigrquery documentation built on April 20, 2024, 12:03 p.m.