R/tbl.R

Defines functions add_params_to_quosure print.tbl_kusto_abstract export.tbl_kusto export.kusto_database_endpoint export kusto_export_cmd compute.tbl_kusto generate_table_name collect.tbl_kusto tbl_kusto show_query.tbl_kusto_abstract group_vars.tbl_kusto_abstract tbl_vars.tbl_kusto_abstract intersect.tbl_kusto_abstract setequal.tbl_kusto_abstract setdiff.tbl_kusto_abstract union.tbl_kusto_abstract union_all.tbl_kusto_abstract anti_join.tbl_kusto_abstract semi_join.tbl_kusto_abstract full_join.tbl_kusto_abstract right_join.tbl_kusto_abstract left_join.tbl_kusto_abstract inner_join.tbl_kusto_abstract slice_sample.tbl_kusto_abstract head.tbl_kusto_abstract nest.tbl_kusto_abstract unnest.tbl_kusto_abstract summarise.tbl_kusto_abstract ungroup.tbl_kusto_abstract group_by.tbl_kusto_abstract arrange.tbl_kusto_abstract mutate.tbl_kusto_abstract filter.tbl_kusto_abstract rename.tbl_kusto_abstract distinct.tbl_kusto_abstract select.tbl_kusto_abstract tbl_kusto_abstract

Documented in anti_join.tbl_kusto_abstract collect.tbl_kusto compute.tbl_kusto export export.kusto_database_endpoint export.tbl_kusto full_join.tbl_kusto_abstract inner_join.tbl_kusto_abstract kusto_export_cmd left_join.tbl_kusto_abstract nest.tbl_kusto_abstract right_join.tbl_kusto_abstract semi_join.tbl_kusto_abstract show_query.tbl_kusto_abstract summarise.tbl_kusto_abstract tbl_kusto tbl_kusto_abstract unnest.tbl_kusto_abstract

#' Create a local lazy tbl
#'
#' Useful for testing KQL generation without a remote connection.
#'
#' @keywords internal
#' @export
#' @examples
#' library(dplyr)
#' df <- data.frame(x = 1, y = 2)
#'
#' df <- tbl_kusto_abstract(df, "table1")
#' df %>%
#'     summarise(x = sd(x)) %>%
#'     show_query()
tbl_kusto_abstract <- function(df, table_name, ...) {
    params <- list(...)
    src <- structure(
        list(
            database = "local_df",
            server = "local_df",
            table = escape(ident(table_name))
        ),
        class = "kusto_database_endpoint"
    )
    make_tbl("kusto_abstract", ops = op_base_local(df), src = src, params = params)
}

setOldClass(c("tbl_kusto_abstract", "tbl"))

#' @export
select.tbl_kusto_abstract <- function(.data, ...) {
    dots <- quos(...)
    add_op_single("select", .data, dots = dots)
}

#' @export
distinct.tbl_kusto_abstract <- function(.data, ...) {
    dots <- quos(...)
    dots <- partial_eval(dots, vars = op_vars(.data))
    add_op_single("distinct", .data, dots = dots)
}

#' @export
rename.tbl_kusto_abstract <- function(.data, ...) {
    dots <- quos(...)
    add_op_single("rename", .data, dots = dots)
}

#' @export
filter.tbl_kusto_abstract <- function(.data, ...) {
    dots <- quos(...)
    # add the tbl params into the environment of the expression's quosure
    dots <- lapply(dots, add_params_to_quosure, params = .data$params)
    dots <- partial_eval(dots, vars = op_vars(.data))
    add_op_single("filter", .data, dots = dots)
}

#' @export
mutate.tbl_kusto_abstract <- function(.data, ...) {
    dots <- quos(..., .named = TRUE)
    dots <- lapply(dots, add_params_to_quosure, params = .data$params)
    dots <- partial_eval(dots, vars = op_vars(.data))
    add_op_single("mutate", .data, dots = dots)
}

#' @export
arrange.tbl_kusto_abstract <- function(.data, ...) {
    dots <- quos(...)
    dots <- partial_eval(dots, vars = op_vars(.data))
    names(dots) <- NULL
    add_op_single("arrange", .data, dots = dots)
}

#' @export
group_by.tbl_kusto_abstract <- function(.data, ..., add = FALSE) {
    dots <- quos(...)
    dots <- partial_eval(dots, vars = op_vars(.data))
    if (is_empty(dots)) {
        return(.data)
    }
    # Updated for dplyr deprecation of .dots and add params
    groups <- group_by_prepare(.data, !!!dots, .add = add)
    names <- vapply(groups$groups, as_string, character(1))
    add_op_single("group_by",
        groups$data,
        dots = set_names(groups$groups, names),
        args = list(add = FALSE)
    )
}

#' @export
ungroup.tbl_kusto_abstract <- function(x, ...) {
    add_op_single("ungroup", x)
}

#' Summarise method for Kusto tables
#'
#' This method is the same as other summarise methods, with the exception of the `.strategy`, `.shufflekeys` and `.num_partitions` optional arguments. They provide hints to the Kusto engine on how to execute the summarisation, and can sometimes be useful to speed up a query. See the Kusto documentation for more details.
#'
#' @param .data A Kusto tbl.
#' @param ... Summarise expressions.
#' @param .strategy A summarise strategy to pass to Kusto. Currently the only value supported is "shuffle".
#' @param .shufflekeys A character vector of column names to use as shuffle keys.
#' @param .num_partitions The number of partitions for a shuffle query.
#' @seealso
#' [dplyr::summarise]
#'
#' @examples
#' \dontrun{
#'
#' tbl1 <- tbl_kusto(db, "table1")
#'
#' ## standard dplyr syntax:
#' summarise(tbl1, mx = mean(x))
#'
#' ## Kusto extensions:
#' summarise(tbl1, mx = mean(x), .strategy = "broadcast") # a broadcast summarise
#'
#' summarise(tbl1, mx = mean(x), .shufflekeys = c("var1", "var2")) # a shuffle summarise with keys
#'
#' summarise(tbl1, mx = mean(x), .num_partitions = 5) # no. of partitions for a shuffle summarise
#' }
#'
#' @rdname summarise
#' @export
summarise.tbl_kusto_abstract <- function(.data, ..., .strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL) {
    dots <- quos(..., .named = TRUE)
    dots <- partial_eval(dots, vars = op_vars(.data))
    add_op_single("summarise", .data,
        dots = dots,
        args = list(.strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions)
    )
}

#' Unnest method for Kusto tables
#'
#' This method takes a list column and expands it so that each element of the list gets its own row.
#' unnest() translates to Kusto's mv-expand operator.
#'
#' @param data A Kusto tbl.
#' @param cols Specification of columns to unnest.
#' @param ... `r lifecycle::badge("deprecated")`:
#'   previously you could write `df %>% unnest(x, y, z)`.
#'   Convert to `df %>% unnest(c(x, y, z))`. If you previously created a new
#'   variable in `unnest()` you'll now need to do it explicitly with `mutate()`.
#'   Convert `df %>% unnest(y = fun(x, y, z))`
#'   to `df %>% mutate(y = fun(x, y, z)) %>% unnest(y)`.
#' @param keep_empty Needed for agreement with generic. Not otherwise used. Kusto does not keep empty rows.
#' @param ptype Needed for agreement with generic. Not otherwise used.
#' @param names_sep Needed for agreement with generic. Not otherwise used.
#' @param names_repair Needed for agreement with generic. Not otherwise used.
#' @param .drop Needed for agreement with generic. Not otherwise used.
#' @param .id Data frame identifier - if supplied, will create a new column with name .id, giving a unique identifier. This is most useful if the list column is named.
#' @param .sep Needed for agreement with generic. Not otherwise used.
#' @param .preserve Needed for agreement with generic. Not otherwise used.
#' @export
unnest.tbl_kusto_abstract <- function(data, cols, ..., keep_empty = FALSE, ptype = NULL,
                                      names_sep = NULL, names_repair = NULL, .drop = NULL,
                                      .id = NULL, .sep = NULL, .preserve = NULL) {
    # dots <- quos(...)
    dots <- enquo(cols)
    add_op_single("unnest", data, dots = dots, args = list(.id = .id))
}

#' Nest method for Kusto tables
#'
#' This method collapses a column into a list
#'
#' @param .data A kusto tbl.
#' @param ... Specification of columns to nest. Translates to summarize make_list() in Kusto.
#' @export
nest.tbl_kusto_abstract <- function(.data, ...) {
    nest_vars <- unname(tidyselect::vars_select(op_vars(.data), ...))

    if (is_empty(nest_vars)) {
        nest_vars <- op_vars(.data)
    }

    group_vars <- union(op_grps(.data), setdiff(op_vars(.data), nest_vars))
    nest_vars <- setdiff(nest_vars, group_vars)
    dot_calls <- mapply(function(x) expr(make_list(!!as.name(x))), nest_vars)

    if (is_empty(group_vars)) {
        summarise(.data, !!!dot_calls)
    } else {
        summarise(group_by(.data, !!as.name(group_vars)), !!!dot_calls)
    }
}

#' @export
head.tbl_kusto_abstract <- function(x, n = 6L, ...) {
    add_op_single("head", x, args = list(n = n))
}

#' @export
slice_sample.tbl_kusto_abstract <- function(.data, ..., n = 6L, prop, by, weight_by, replace) {
    add_op_single("slice_sample", .data, args = list(n = n))
}

#' Join methods for Kusto tables
#'
#' These methods are the same as other joining methods, with the exception of the `.strategy`, `.shufflekeys` and `.num_partitions` optional arguments. They provide hints to the Kusto engine on how to execute the join, and can sometimes be useful to speed up a query. See the Kusto documentation for more details.
#'
#' @param x,y Kusto tbls.
#' @param by The columns to join on.
#' @param copy Needed for agreement with generic. Not otherwise used.
#' @param suffix The suffixes to use for deduplicating column names.
#' @param ... Other arguments passed to lower-level functions.
#' @param keep Needed for agreement with generic. Not otherwise used. Kusto retains keys from both sides of joins.
#' @param .strategy A join strategy hint to pass to Kusto. Currently the values supported are "shuffle" and "broadcast".
#' @param .shufflekeys A character vector of column names to use as shuffle keys.
#' @param .num_partitions The number of partitions for a shuffle query.
#' @param .remote A join strategy hint to use for cross-cluster joins. Can be "left", "right", "local" or "auto" (the default).
#' @seealso
#' [dplyr::join]
#'
#' @examples
#' \dontrun{
#'
#' tbl1 <- tbl_kusto(db, "table1")
#' tbl2 <- tbl_kusto(db, "table2")
#'
#' # standard dplyr syntax:
#' left_join(tbl1, tbl2)
#'
#' # Kusto extensions:
#' left_join(tbl1, tbl2, .strategy = "broadcast") # a broadcast join
#'
#' left_join(tbl1, tbl2, .shufflekeys = c("var1", "var2")) # shuffle join with shuffle keys
#'
#' left_join(tbl1, tbl2, .num_partitions = 5) # no. of partitions for a shuffle join
#' }
#'
#' @aliases inner_join left_join right_join full_join semi_join anti_join
#'
#' @rdname join
#' @export
inner_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
                                          keep = NULL, .strategy = NULL, .shufflekeys = NULL,
                                          .num_partitions = NULL, .remote = NULL) {
    add_op_join("inner_join", x, y,
        by = by, suffix = suffix,
        .strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
        .remote = .remote, ...
    )
}

#' @rdname join
#' @export
left_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
                                         keep = NULL, .strategy = NULL, .shufflekeys = NULL,
                                         .num_partitions = NULL, .remote = NULL) {
    add_op_join("left_join", x, y,
        by = by, suffix = suffix,
        .strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
        .remote = .remote, ...
    )
}

#' @rdname join
#' @export
right_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
                                          keep = NULL, .strategy = NULL, .shufflekeys = NULL,
                                          .num_partitions = NULL, .remote = NULL) {
    add_op_join("right_join", x, y,
        by = by, suffix = suffix,
        .strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
        .remote = .remote, ...
    )
}

#' @rdname join
#' @export
full_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"), ...,
                                         keep = NULL, .strategy = NULL, .shufflekeys = NULL,
                                         .num_partitions = NULL, .remote = NULL) {
    add_op_join("full_join", x, y,
        by = by, suffix = suffix,
        .strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
        .remote = .remote, ...
    )
}

#' @rdname join
#' @export
semi_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, ..., suffix = c(".x", ".y"),
                                         .strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
                                         .remote = NULL) {
    add_op_join("semi_join", x, y,
        by = by, suffix = suffix,
        .strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
        .remote = .remote, ...
    )
}

#' @rdname join
#' @export
anti_join.tbl_kusto_abstract <- function(x, y, by = NULL, copy = NULL, suffix = c(".x", ".y"),
                                         .strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL,
                                         .remote = NULL, ...) {
    add_op_join("anti_join", x, y,
        by = by, suffix = suffix,
        .strategy = .strategy, .shufflekeys = .shufflekeys, .num_partitions = .num_partitions,
        .remote = .remote, ...
    )
}

#' @export
union_all.tbl_kusto_abstract <- function(x, y, ...) {
    add_op_set_op(x, y, "union_all")
}

#' @export
union.tbl_kusto_abstract <- function(x, y, ...) {
    stop("Kusto does not support union(). Please use union_all() instead.")
}

#' @export
setdiff.tbl_kusto_abstract <- function(x, y, ...) {
    stop("Kusto does not support setdiff() at this time.")
}

#' @export
setequal.tbl_kusto_abstract <- function(x, y, ...) {
    stop("Kusto does not support setequal() at this time.")
}

#' @export
intersect.tbl_kusto_abstract <- function(x, y, ...) {
    stop("Kusto does not support intersect() at this time.")
}

#' @export
tbl_vars.tbl_kusto_abstract <- function(x) {
    op_vars(x$ops)
}

#' @export
group_vars.tbl_kusto_abstract <- function(x) {
    op_grps(x$ops)
}

#' Translate a sequence of dplyr operations on a tbl into a Kusto query string.
#' @export
#' @param x A tbl_kusto or tbl_kusto_abstract instance
#' @param ... needed for agreement with generic. Not otherwise used.
show_query.tbl_kusto_abstract <- function(x, ...) {
    qry <- kql_build(x)
    kql_render(qry)
}

#' A tbl object representing a table in a Kusto database.
#' @export
#' @param kusto_database An instance of kusto_database_endpoint that this table should be queried from
#' @param table_name The name of the table in the Kusto database
#' @param ... parameters to pass in case the Kusto source table is a parameterized function.
tbl_kusto <- function(kusto_database, table_name, ...) {
    stopifnot(inherits(kusto_database, "kusto_database_endpoint"))
    params <- list(...)
    # in case the table name is a function like MyFunction(arg1, arg2) we need to split it
    table_ident <- strsplit(table_name, split = "\\(")[[1]]
    table_ident[1] <- escape(ident(table_ident[1]))
    escaped_table_name <- paste(table_ident, collapse = "(")
    kusto_database$table <- escaped_table_name
    query_str <- sprintf("%s | take 1", escaped_table_name)
    vars <- names(run_query(kusto_database, query_str, ...))
    ops <- op_base_remote(table_name, vars)
    make_tbl(c("kusto", "kusto_abstract"), src = kusto_database, ops = ops, params = params)
}

#' Compile the preceding dplyr operations into a kusto query, execute it on the remote server,
#' and return the result as a tibble.
#' @export
#' @param x An instance of class tbl_kusto representing a Kusto table
#' @param ... needed for agreement with generic. Not otherwise used.
collect.tbl_kusto <- function(x, ...) {
    q <- kql_build(x)
    q_str <- kql_render(q)
    params <- c(x$params, list(...))
    params$database <- x$src
    params$qry_cmd <- q_str
    res <- do.call(run_query, params)
    tibble::as_tibble(res)
}

generate_table_name <- function() {
    paste0("Rtbl_", paste0(sample(letters, 8), collapse = ""))
}

#' Execute the query, store the results in a table, and return a reference to the new table
#' @export
#' @param x An instance of class tbl_kusto representing a Kusto table
#' @param ... other parameters passed to the query
#' @param name The name for the Kusto table to be created.
#' If name is omitted, the table will be named Rtbl_ + 8 random lowercase letters
compute.tbl_kusto <- function(x, ..., name = generate_table_name()) {
    q <- kql_build(x)
    q_str <- kql_render(q)
    new_tbl_name <- kql_escape_ident(name)
    set_cmd <- kql(paste0(".set ", new_tbl_name, " <|\n"))
    q_str <- kql(paste0(set_cmd, q_str))
    params <- c(x$params, list(...))
    params$database <- x$src
    params$qry_cmd <- q_str
    res <- do.call(run_query, params)
    invisible(tbl_kusto(x$src, name))
}

#' Execute the query, store the results in a table, and return a reference to the new table
#' Run a Kusto query and export results to Azure Storage in Parquet or CSV
#' format.
#'
#' @param query The text of the Kusto query to run
#' @param storage_uri The URI of the blob storage container to export to
#' @param name_prefix The filename prefix for each exported file
#' @param key The account key for the storage container.
#' uses the identity that is signed into Kusto to authenticate to Azure Storage.
#' @param format Options are "parquet", "csv", "tsv", "json"
#' @param distributed logical, indicates whether Kusto should distributed the
#' export job to multiple nodes, in which case multiple files will be written
#' to storage concurrently.
kusto_export_cmd <- function(
    query, storage_uri, name_prefix, key, format,
    distributed) {
    # Make sure the storage uri ends with a slash
    if (!(format %in% c("parquet", "csv", "tsv", "json"))) {
        stop("Format must be one of parquet, csv, tsv, or json.")
    }
    if (!endsWith(storage_uri, "/")) {
        storage_uri <- paste0(storage_uri, "/")
    }
    distr <- ifelse(distributed, "true", "false")
    compr <- ifelse(format == "parquet", "snappy", "gzip")
    sprintf(".export
compressed
to %s (h@'%s%s;%s')
with (
sizeLimit=1073741824,
namePrefix='%s',
fileExtension='%s',
compressionType='%s',
includeHeaders='firstFile',
encoding='UTF8NoBOM',
distributed=%s
)
<|
%s
", format, storage_uri, name_prefix, key, name_prefix, format, compr, distr, query)
}

#' Execute the Kusto query and export the result to Azure Storage.
#' @param tbl An object representing a table or database.
#' @param storage_uri The Azure Storage URI to export files to.
#' @param query A Kusto query string
#' @param name_prefix The filename prefix to use for exported files.
#' @param key default "impersonate" which uses the account signed into Kusto to
#' authenticate to Azure Storage. An Azure Storage account key.
#' @param format Options are "parquet", "csv", "tsv", "json"
#' @param distributed logical, indicates whether Kusto should distributed the
#' export job to multiple nodes, in which case multiple files will be written
#' to storage concurrently.
#' @param ... needed for agreement with generic. Not otherwise used.
#' @rdname export
#' @export
export <- function(
    tbl, storage_uri, query = NULL, name_prefix = "export",
    key = "impersonate", format = "parquet", distributed = FALSE, ...) {
    UseMethod("export")
}

#' Execute the Kusto query and export the result to Azure Storage.
#' @param tbl A Kusto database endpoint object, as returned by `kusto_database_endpoint`.
#' @param query A Kusto query string
#' @param storage_uri The Azure Storage URI to export files to.
#' @param name_prefix The filename prefix to use for exported files.
#' @param key default "impersonate" which uses the account signed into Kusto to
#' authenticate to Azure Storage. An Azure Storage account key.
#' @param format Options are "parquet", "csv", "tsv", "json"
#' @param distributed logical, indicates whether Kusto should distributed the
#' export job to multiple nodes, in which case multiple files will be written
#' to storage concurrently.
#' @param ... needed for agreement with generic. Not otherwise used.
#' @rdname export
#' @export
export.kusto_database_endpoint <- function(
    tbl, storage_uri, query = NULL, name_prefix = "export",
    key = "impersonate", format = "parquet", distributed = FALSE, ...) {
    if (missing(query)) stop("query parameter is required.")
    is_cmd <- substr(query, 1, 1) == "."
    if (is_cmd) stop("Management commands cannot be used with export()")
    q_str <- kusto_export_cmd(
        query = query, storage_uri = storage_uri,
        name_prefix = name_prefix, key = key, format = format,
        distributed = distributed
    )
    run_query(tbl, q_str, ...)
}

#' @rdname export
#' @export
export.tbl_kusto <- function(
    tbl, storage_uri, query = NULL, name_prefix = "export",
    key = "impersonate", format = "parquet", distributed = FALSE, ...) {
    database <- tbl$src
    q <- kql_render(kql_build(tbl))
    q_str <- kusto_export_cmd(
        query = q, storage_uri = storage_uri,
        name_prefix = name_prefix, key = key, format = format,
        distributed = distributed
    )
    res <- run_query(database, q_str, ...)
    tibble::as_tibble(res)
}

#' @keywords internal
#' @export
print.tbl_kusto_abstract <- function(x, ...) {
    # different paths if this is a query, simulated table, or real table
    if (!inherits(x$ops, "op_base")) {
        cat("<Kusto query>\n")
        print(show_query(x))
    } else if (!inherits(x, "tbl_kusto")) {
        cat("<Simulated Kusto table '")
        name <- paste0("local_df/", x$src$table)
        cat(name, "'>\n", sep = "")
    } else {
        cat("<Kusto table '")
        url <- httr::parse_url(x$src$server)
        url$path <- file.path(x$src$database, x$src$table)
        cat(httr::build_url(url), "'>\n", sep = "")
    }

    invisible(x)
}

add_params_to_quosure <- function(quosure, params) {
    new_env <- list2env(params, envir = get_env(quosure))
    quo_set_env(quosure, new_env)
}

Try the AzureKusto package in your browser

Any scripts or data that you put into this service are public.

AzureKusto documentation built on Oct. 12, 2023, 5:11 p.m.