R/dbplyr-src.R

Defines functions compute.tbl_presto .compute_tbl_presto copy_to.PrestoConnection collect.tbl_presto copy_to.src_presto tbl.PrestoConnection tbl.src_presto src_presto

Documented in collect.tbl_presto compute.tbl_presto copy_to.PrestoConnection copy_to.src_presto src_presto tbl.PrestoConnection tbl.src_presto

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

#' @include PrestoConnection.R
NULL

#' dplyr integration to connect to a Presto database.
#'
#' Allows you to connect to an existing database through a presto connection.
#'
#' @param catalog Catalog to use in the connection
#' @param schema Schema to use in the connection
#' @param user User name to use in the connection
#' @param host Host name to connect to the database
#' @param port Port number to use with the host name
#' @param source Source to specify for the connection
#' @param session.timezone Time zone for the connection
#' @param parameters Additional parameters to pass to the connection
#' @param bigint The R type that Presto's 64-bit integer (`BIGINT`) types should
#'          be translated to. The default is `"integer"`, which returns R's
#'          `integer` type, but results in `NA` for values above/below
#'          +/-2147483647. `"integer64"` returns a [bit64::integer64], which
#'          allows the full range of 64 bit integers. `"numeric"` coerces into
#'          R's `double` type but might result in precision loss. Lastly,
#'          `"character"` casts into R's `character` type.
#' @param con An object that inherits from [RPresto::PrestoConnection-class],
#'          typically generated by [DBI::dbConnect]. When a valid connection
#'          object is supplied, Other arguments are ignored.
#' @param ... For `src_presto` other arguments passed on to the underlying
#'   database connector `dbConnect`. For `tbl.src_presto`, it is
#'   included for compatibility with the generic, but otherwise ignored.
#' @export
#' @name src_presto
#' @examples
#' \dontrun{
#' # To connect to a database
#' my_db <- src_presto(
#'   catalog = "memory",
#'   schema = "default",
#'   user = Sys.getenv("USER"),
#'   host = "http://localhost",
#'   port = 8080,
#'   session.timezone = "Asia/Kathmandu"
#' )
#' # Use a PrestoConnection
#' my_con <- DBI::dbConnect(
#'   catalog = "memory",
#'   schema = "default",
#'   user = Sys.getenv("USER"),
#'   host = "http://localhost",
#'   port = 8080,
#'   session.timezone = "Asia/Kathmandu"
#' )
#' my_db2 <- src_presto(con = my_con)
#' }
src_presto <- function(catalog = NULL,
                       schema = NULL,
                       user = NULL,
                       host = NULL,
                       port = NULL,
                       source = NULL,
                       session.timezone = NULL,
                       parameters = NULL,
                       bigint = c("integer", "integer64", "numeric", "character"),
                       con = NULL,
                       ...) {
  if (is.null(con)) {
    con <- DBI::dbConnect(
      drv = Presto(),
      catalog = catalog %||% character(0),
      schema = schema %||% character(0),
      user = user %||% character(0),
      host = host %||% character(0),
      port = port %||% character(0),
      source = source %||% character(0),
      session.timezone = session.timezone %||% character(0),
      parameters = parameters %||% list(),
      bigint = match.arg(bigint) %||% character(0),
      ...
    )
  } else {
    stopifnot(inherits(con, "PrestoConnection"))
  }

  src <- dbplyr::src_dbi(con, auto_disconnect = FALSE)
  class(src) <- c("src_presto", class(src))
  return(src)
}

#' dplyr integration to connect to a table in a database.
#'
#' Use `src_presto` to connect to an existing database,
#' and `tbl` to connect to tables within that database.
#' If you're unsure of the arguments to pass, please ask your database
#' administrator for the values of these variables.
#'
#' @importFrom dplyr tbl
#' @export
#' @param src A presto src created with `src_presto`.
#' @param from Either a string giving the name of table in database, or
#'   [dplyr::sql()] described a derived table or compound join.
#' @param vars Provide column names as a character vector
#'   to avoid retrieving them from the database.
#' @examples
#' \dontrun{
#' # First create a database connection with src_presto, then reference a tbl
#' # within that database
#' my_db <- src_presto(
#'   catalog = "memory",
#'   schema = "default",
#'   user = Sys.getenv("USER"),
#'   host = "http://localhost",
#'   port = 8080,
#'   session.timezone = "Asia/Kathmandu"
#' )
#' my_tbl <- tbl(my_db, "my_table")
#' }
#' @rdname dplyr_source_function_implementations
#' @keywords internal
tbl.src_presto <- function(src, from, ..., vars = NULL) {
  subclass <- class(src$con)[[1]]
  # dbListFields uses SHOW COLUMNS to get field names of a table
  if (!dbplyr::is.sql(from)) {
    name <- DBI::dbQuoteIdentifier(src$con, from)
    if (is.null(vars)) {
      vars <- dbListFields(src$con, name)
    }
  }
  dbplyr::tbl_sql(
    c("presto", subclass, "dbi"), src = src, from = from, vars = vars, ...
  )
}

#' Create a remote database source table using a PrestoConnection
#'
#' Automatically create a Presto remote database source to wrap around the
#' `PrestoConnection` object via which DBI APIs can be called.
#'
#' @importFrom dplyr tbl
#' @export
#' @param src A `PrestoConnection` object produced by `DBI::dbConnect()`.
#' @param from Either a string (giving a table name) or a literal
#'          [dbplyr::sql()] string.
#' @param ... Passed on to [dbplyr::tbl_sql()]
#' @rdname dplyr_source_function_implementations
#' @keywords internal
#' @md
#' @examples
#' \dontrun{
#' # First create a database connection, then reference a tbl within that
#' # database
#' my_con <- DBI::dbConnect(
#'   catalog = "memory",
#'   schema = "default",
#'   user = Sys.getenv("USER"),
#'   host = "http://localhost",
#'   port = 8080,
#'   session.timezone = "Asia/Kathmandu"
#' )
#' my_tbl <- tbl(my_con, "my_table")
#' }
tbl.PrestoConnection <- function(src, from, ...) {
  dplyr::tbl(src_presto(con = src), from = from, ...)
}

#' S3 implementation of [dplyr::copy_to()] for remote Presto source
#'
#' @importFrom dplyr copy_to
#' @export
#' @inheritParams dplyr::copy_to
#' @param with An optional WITH clause for the CREATE TABLE statement.
#' @rdname dplyr_source_function_implementations
#' @keywords internal
copy_to.src_presto <- function(dest, df, name = deparse(substitute(df)), overwrite = FALSE,
                               ...,
                               with = NULL) {
  if (inherits(df, "tbl_sql") && dplyr::same_src(df$src, dest)) {
    out <- dplyr::compute(df,
      name = name,
      temporary = FALSE,
      analyze = FALSE,
      ...
    )
  } else {
    df <- as.data.frame(dplyr::collect(df))
    name <- dbplyr::db_copy_to(con = dest$con, table = name, values = df,
      overwrite = overwrite,
      types = NULL,
      temporary = FALSE,
      analyze = FALSE,
      in_transaction = FALSE,
      with = with,
      ...
    )
    vars <- names(df)
    out <- dplyr::tbl(src = dest, from = name, vars = vars)
  }
  invisible(out)
}

#' S3 implementation of `collect` for Presto.
#'
#' @importFrom dplyr collect
#' @export
#' @rdname dplyr_function_implementations
#' @keywords internal
collect.tbl_presto <- function(x, ..., n = Inf, warn_incomplete = TRUE) {
  if (identical(n, Inf)) {
    n <- -1
  } else {
    x <- utils::head(x, n)
  }
  sql <- dbplyr::db_sql_render(dbplyr::remote_con(x), x)
  # This is the one place whereby this implementation is different from the
  # default dbplyr::collect.tbl_sql()
  # We pass ... to db_collect() here so that bigint can be used in collect()
  # to specify the BIGINT treatment
  out <- dbplyr::db_collect(
    dbplyr::remote_con(x), sql,
    n = n, warn_incomplete = warn_incomplete, ...
  )
  dplyr::grouped_df(out, intersect(dbplyr::op_grps(x), names(out)))
}

#' S3 implementation of [dplyr::copy_to()] for PrestoConnection
#'
#' @importFrom dplyr copy_to
#' @export
#' @inheritParams dplyr::copy_to
#' @rdname dplyr_source_function_implementations
#' @keywords internal
copy_to.PrestoConnection <- function(dest, df, name = deparse(substitute(df)), overwrite = FALSE, ..., with = NULL) {
  copy_to(
    dest = src_presto(con = dest),
    df = df,
    name = name,
    overwrite = overwrite,
    ...,
    with = with
  )
}

.compute_tbl_presto <- function(x, name, temporary = FALSE, ..., cte = FALSE) {
  name <- unname(name)
  if (identical(cte, TRUE)) {
    if (inherits(x$lazy_query, "lazy_base_remote_query")) {
      stop(
        "No operations need to be computed. Aborting compute.",
        call. = FALSE
      )
    }
    con <- dbplyr::remote_con(x)
    # We need to speicify sql_options here so that use_presto_cte is passed to
    # db_sql_render correctly
    # (see https://github.com/tidyverse/dbplyr/issues/1394)
    sql <- dbplyr::db_sql_render(
      con = dbplyr::remote_con(x), sql = x,
      sql_options = dbplyr::sql_options(), use_presto_cte = FALSE
    )
    con@session$addCTE(name, sql, replace = TRUE)
  } else {
    sql <- dbplyr::db_sql_render(
      dbplyr::remote_con(x), x, use_presto_cte = TRUE
    )
    name <- dbplyr::db_compute(
      dbplyr::remote_con(x), name, sql, temporary = temporary, ...
    )
  }
  name
}

#' S3 implementation of `compute` for Presto.
#'
#' @importFrom dplyr compute
#' @importFrom dplyr %>%
#' @importFrom rlang !!!
#' @export
#' @param x A lazy data frame backed by a database query.
#' @param cte `r lifecycle::badge("experimental")`
#'   An experimental feature to save the query to a common table expression.
#'   Default to FALSE. See `vignette("common-table-expressions")`
#' @rdname dplyr_function_implementations
compute.tbl_presto <- function(x, name, temporary = FALSE, ..., cte = FALSE) {
  name <- .compute_tbl_presto(
    x = x, name = name, temporary = temporary, ..., cte = cte
  )
  dplyr::tbl(src = dbplyr::remote_src(x), from = name, vars = colnames(x)) %>%
    dplyr::group_by(!!!rlang::syms(dbplyr::op_grps(x))) %>%
    dbplyr::window_order(!!!dbplyr::op_sort(x))
}
prestodb/RPresto documentation built on April 30, 2024, 1:34 a.m.