# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#' @include PrestoConnection.R
NULL
#' dplyr integration to connect to a Presto database.
#'
#' Allows you to connect to an existing database through a presto connection.
#'
#' @param catalog Catalog to use in the connection
#' @param schema Schema to use in the connection
#' @param user User name to use in the connection
#' @param host Host name to connect to the database
#' @param port Port number to use with the host name
#' @param source Source to specify for the connection
#' @param session.timezone Time zone for the connection
#' @param parameters Additional parameters to pass to the connection
#' @param bigint The R type that Presto's 64-bit integer (`BIGINT`) types should
#' be translated to. The default is `"integer"`, which returns R's
#' `integer` type, but results in `NA` for values above/below
#' +/-2147483647. `"integer64"` returns a [bit64::integer64], which
#' allows the full range of 64 bit integers. `"numeric"` coerces into
#' R's `double` type but might result in precision loss. Lastly,
#' `"character"` casts into R's `character` type.
#' @param con An object that inherits from [RPresto::PrestoConnection-class],
#' typically generated by [DBI::dbConnect]. When a valid connection
#' object is supplied, Other arguments are ignored.
#' @param ... For `src_presto` other arguments passed on to the underlying
#' database connector `dbConnect`. For `tbl.src_presto`, it is
#' included for compatibility with the generic, but otherwise ignored.
#' @export
#' @name src_presto
#' @examples
#' \dontrun{
#' # To connect to a database
#' my_db <- src_presto(
#' catalog = "memory",
#' schema = "default",
#' user = Sys.getenv("USER"),
#' host = "http://localhost",
#' port = 8080,
#' session.timezone = "Asia/Kathmandu"
#' )
#' # Use a PrestoConnection
#' my_con <- DBI::dbConnect(
#' catalog = "memory",
#' schema = "default",
#' user = Sys.getenv("USER"),
#' host = "http://localhost",
#' port = 8080,
#' session.timezone = "Asia/Kathmandu"
#' )
#' my_db2 <- src_presto(con = my_con)
#' }
src_presto <- function(catalog = NULL,
schema = NULL,
user = NULL,
host = NULL,
port = NULL,
source = NULL,
session.timezone = NULL,
parameters = NULL,
bigint = c("integer", "integer64", "numeric", "character"),
con = NULL,
...) {
if (is.null(con)) {
con <- DBI::dbConnect(
drv = Presto(),
catalog = catalog %||% character(0),
schema = schema %||% character(0),
user = user %||% character(0),
host = host %||% character(0),
port = port %||% character(0),
source = source %||% character(0),
session.timezone = session.timezone %||% character(0),
parameters = parameters %||% list(),
bigint = match.arg(bigint) %||% character(0),
...
)
} else {
stopifnot(inherits(con, "PrestoConnection"))
}
src <- dbplyr::src_dbi(con, auto_disconnect = FALSE)
class(src) <- c("src_presto", class(src))
return(src)
}
#' dplyr integration to connect to a table in a database.
#'
#' Use `src_presto` to connect to an existing database,
#' and `tbl` to connect to tables within that database.
#' If you're unsure of the arguments to pass, please ask your database
#' administrator for the values of these variables.
#'
#' @importFrom dplyr tbl
#' @export
#' @param src A presto src created with `src_presto`.
#' @param from Either a string giving the name of table in database, or
#' [dplyr::sql()] described a derived table or compound join.
#' @param vars Provide column names as a character vector
#' to avoid retrieving them from the database.
#' @examples
#' \dontrun{
#' # First create a database connection with src_presto, then reference a tbl
#' # within that database
#' my_db <- src_presto(
#' catalog = "memory",
#' schema = "default",
#' user = Sys.getenv("USER"),
#' host = "http://localhost",
#' port = 8080,
#' session.timezone = "Asia/Kathmandu"
#' )
#' my_tbl <- tbl(my_db, "my_table")
#' }
#' @rdname dplyr_source_function_implementations
#' @keywords internal
tbl.src_presto <- function(src, from, ..., vars = NULL) {
subclass <- class(src$con)[[1]]
# dbListFields uses SHOW COLUMNS to get field names of a table
if (!dbplyr::is.sql(from)) {
name <- DBI::dbQuoteIdentifier(src$con, from)
if (is.null(vars)) {
vars <- dbListFields(src$con, name)
}
}
dbplyr::tbl_sql(
c("presto", subclass, "dbi"), src = src, from = from, vars = vars, ...
)
}
#' Create a remote database source table using a PrestoConnection
#'
#' Automatically create a Presto remote database source to wrap around the
#' `PrestoConnection` object via which DBI APIs can be called.
#'
#' @importFrom dplyr tbl
#' @export
#' @param src A `PrestoConnection` object produced by `DBI::dbConnect()`.
#' @param from Either a string (giving a table name) or a literal
#' [dbplyr::sql()] string.
#' @param ... Passed on to [dbplyr::tbl_sql()]
#' @rdname dplyr_source_function_implementations
#' @keywords internal
#' @md
#' @examples
#' \dontrun{
#' # First create a database connection, then reference a tbl within that
#' # database
#' my_con <- DBI::dbConnect(
#' catalog = "memory",
#' schema = "default",
#' user = Sys.getenv("USER"),
#' host = "http://localhost",
#' port = 8080,
#' session.timezone = "Asia/Kathmandu"
#' )
#' my_tbl <- tbl(my_con, "my_table")
#' }
tbl.PrestoConnection <- function(src, from, ...) {
dplyr::tbl(src_presto(con = src), from = from, ...)
}
#' S3 implementation of [dplyr::copy_to()] for remote Presto source
#'
#' @importFrom dplyr copy_to
#' @export
#' @inheritParams dplyr::copy_to
#' @param with An optional WITH clause for the CREATE TABLE statement.
#' @rdname dplyr_source_function_implementations
#' @keywords internal
copy_to.src_presto <- function(dest, df, name = deparse(substitute(df)), overwrite = FALSE,
...,
with = NULL) {
if (inherits(df, "tbl_sql") && dplyr::same_src(df$src, dest)) {
out <- dplyr::compute(df,
name = name,
temporary = FALSE,
analyze = FALSE,
...
)
} else {
df <- as.data.frame(dplyr::collect(df))
name <- dbplyr::db_copy_to(con = dest$con, table = name, values = df,
overwrite = overwrite,
types = NULL,
temporary = FALSE,
analyze = FALSE,
in_transaction = FALSE,
with = with,
...
)
vars <- names(df)
out <- dplyr::tbl(src = dest, from = name, vars = vars)
}
invisible(out)
}
#' S3 implementation of `collect` for Presto.
#'
#' @importFrom dplyr collect
#' @export
#' @rdname dplyr_function_implementations
#' @keywords internal
collect.tbl_presto <- function(x, ..., n = Inf, warn_incomplete = TRUE) {
if (identical(n, Inf)) {
n <- -1
} else {
x <- utils::head(x, n)
}
sql <- dbplyr::db_sql_render(dbplyr::remote_con(x), x)
# This is the one place whereby this implementation is different from the
# default dbplyr::collect.tbl_sql()
# We pass ... to db_collect() here so that bigint can be used in collect()
# to specify the BIGINT treatment
out <- dbplyr::db_collect(
dbplyr::remote_con(x), sql,
n = n, warn_incomplete = warn_incomplete, ...
)
dplyr::grouped_df(out, intersect(dbplyr::op_grps(x), names(out)))
}
#' S3 implementation of [dplyr::copy_to()] for PrestoConnection
#'
#' @importFrom dplyr copy_to
#' @export
#' @inheritParams dplyr::copy_to
#' @rdname dplyr_source_function_implementations
#' @keywords internal
copy_to.PrestoConnection <- function(dest, df, name = deparse(substitute(df)), overwrite = FALSE, ..., with = NULL) {
copy_to(
dest = src_presto(con = dest),
df = df,
name = name,
overwrite = overwrite,
...,
with = with
)
}
.compute_tbl_presto <- function(x, name, temporary = FALSE, ..., cte = FALSE) {
name <- unname(name)
if (identical(cte, TRUE)) {
if (inherits(x$lazy_query, "lazy_base_remote_query")) {
stop(
"No operations need to be computed. Aborting compute.",
call. = FALSE
)
}
con <- dbplyr::remote_con(x)
# We need to speicify sql_options here so that use_presto_cte is passed to
# db_sql_render correctly
# (see https://github.com/tidyverse/dbplyr/issues/1394)
sql <- dbplyr::db_sql_render(
con = dbplyr::remote_con(x), sql = x,
sql_options = dbplyr::sql_options(), use_presto_cte = FALSE
)
con@session$addCTE(name, sql, replace = TRUE)
} else {
sql <- dbplyr::db_sql_render(
dbplyr::remote_con(x), x, use_presto_cte = TRUE
)
name <- dbplyr::db_compute(
dbplyr::remote_con(x), name, sql, temporary = temporary, ...
)
}
name
}
#' S3 implementation of `compute` for Presto.
#'
#' @importFrom dplyr compute
#' @importFrom dplyr %>%
#' @importFrom rlang !!!
#' @export
#' @param x A lazy data frame backed by a database query.
#' @param cte `r lifecycle::badge("experimental")`
#' An experimental feature to save the query to a common table expression.
#' Default to FALSE. See `vignette("common-table-expressions")`
#' @rdname dplyr_function_implementations
compute.tbl_presto <- function(x, name, temporary = FALSE, ..., cte = FALSE) {
name <- .compute_tbl_presto(
x = x, name = name, temporary = temporary, ..., cte = cte
)
dplyr::tbl(src = dbplyr::remote_src(x), from = name, vars = colnames(x)) %>%
dplyr::group_by(!!!rlang::syms(dbplyr::op_grps(x))) %>%
dbplyr::window_order(!!!dbplyr::op_sort(x))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.