R/ops.R

Defines functions op_vars.tbl_df op_vars.tbl_kusto_abstract op_vars.op_set_op op_vars.op_semi_join op_vars.op_join op_vars.op_join op_vars.op_single op_vars.op_mutate op_vars.op_distinct op_vars.op_summarise op_vars.op_rename op_vars.op_select op_vars.op_base op_vars op_grps.tbl_df op_grps.tbl_kusto_abstract op_grps.op_double op_grps.op_single op_grps.op_rename op_grps.op_summarise op_grps.op_ungroup op_grps.op_group_by op_grps.op_base op_grps add_suffixes join_vars add_op_set_op add_op_join op_double add_op_single op_single op_base_remote op_base_local op_base

Documented in add_op_join add_op_set_op add_op_single op_base op_double op_grps op_single op_vars

#' The "base case" operation representing the tbl itself and its column variables
#' @export
#' @param x A tbl object
#' @param vars A vector of column variables in the tbl
#' @param class The class that op_base should inherit from, default is character()
op_base <- function(x, vars, class = character())
{
    stopifnot(is.character(vars))

    structure(
        list(
            x = x,
            vars = vars
        ),
        class = c(paste0("op_base_", class), "op_base", "op")
    )
}

op_base_local <- function(df)
{
    op_base(df, names(df), class = "local")
}

op_base_remote <- function(x, vars)
{
    op_base(x, vars, class = "remote")
}

#' A class representing a single-table verb
#' @export
#' @param name the name of the operation verb, e.g. "select", "filter"
#' @param x the tbl object
#' @param dots expressions passed to the operation verb function
#' @param args other arguments passed to the operation verb function
op_single <- function(name, x, dots = list(), args = list())
{
    structure(
        list(
            name = name,
            x = x,
            dots = dots,
            args = args
        ),
        class = c(paste0("op_", name), "op_single", "op")
    )
}

#' Append an operation representing a single-table verb to the tbl_kusto object's ops list
#' @export
#' @param name The name of the operation, e.g. 'select', 'filter'
#' @param .data The tbl_kusto object to append the operation to
#' @param dots The expressions passed as arguments to the operation verb
#' @param args Other non-expression arguments passed to the operation verb
add_op_single <- function(name, .data, dots = list(), args = list())
{
    .data$ops <- op_single(name, x = .data$ops, dots = dots, args = args)
    .data
}

#' A double-table verb, e.g. joins, setops
#' @export
#' @param name The name of the operation, e.g. 'left_join', 'union_all'
#' @param x The "left" tbl
#' @param y The "right" tbl
#' @param args Other arguments passed to the operation verb
op_double <- function(name, x, y, args = list())
{
    structure(
        list(
            name = name,
            x = x,
            y = y,
            args = args
        ),
        class = c(paste0("op_", name), "op_double", "op")
    )
}

#' Append a join operation to the tbl_kusto object's ops list
#' @export
#' @param type The name of the join type,
#' one of: inner_join, left_join, right_join, full_join, semi_join, anti_join
#' @param x The "left" tbl
#' @param y The "right" tbl
#' @param by A vector of column names; keys by which tbl x and tbl y will be joined
#' @param suffix  A vector of strings that will be appended to the names of non-join key columns that exist in both tbl x and tbl y to distinguish them by source tbl.
#' @param .strategy A strategy hint to provide to Kusto.
#' @param .shufflekeys A character vector of column names to shuffle on, if `.strategy = "shuffle"`.
#' @param .remote A strategy hint to provide to Kusto for cross-cluster joins.
#' @param .num_partitions The number of partitions for a shuffle query.
add_op_join <- function(type, x, y, by = NULL, suffix = NULL,
                       .strategy = NULL, .shufflekeys = NULL, .num_partitions = NULL, .remote = NULL)
{
    by <- common_by(by, x, y)
    vars <- join_vars(op_vars(x), op_vars(y), type = type, by = by, suffix = suffix)
    x$ops <- op_double("join", x, y,
                       args = list(
                           vars = vars,
                           type = type,
                           by = by,
                           suffix = suffix,
                           .strategy = .strategy,
                           .shufflekeys = .shufflekeys,
                           .num_partitions = .num_partitions,
                           .remote = .remote
                       ))
    x
}

#' Append a set operation to the tbl_kusto object's ops list
#' @export
#' @param x The "left" tbl
#' @param y The "right" tbl
#' @param type The type of set operation to perform, currently only supports union_all
add_op_set_op <- function(x, y, type)
{
    x$ops <- op_double("set_op", x, y, args = list(type = type))
    x
}

join_vars <- function(x_names, y_names, type, by, suffix = c(".x", ".y"))
{
    # Remove join keys from y's names
    y_names <- setdiff(y_names, by$y)

    if(!is.character(suffix) || length(suffix) != 2)
        stop("`suffix` must be a character vector of length 2.", call. = FALSE)

    suffix <- list(x = suffix[1], y = suffix[2])
    x_new <- add_suffixes(x_names, y_names, suffix$x)
    y_new <- add_suffixes(y_names, x_names, suffix$y)

    # In left and inner joins, return key values only from x
    # In right joins, return key values only from y
     # In full joins, return key values by coalescing values from x and y
    x_x <- x_names
    x_y <- by$y[match(x_names, by$x)]
    x_y[type == "left_join" | type == "inner_join"] <- NA
    x_x[type == "right_join" & !is.na(x_y)] <- NA
    y_x <- rep_len(NA, length(y_names))
    y_y <- y_names

    # Return a list with 3 parallel vectors
    # At each position, values in the 3 vectors represent
    #  alias - name of column in join result
    #  x - name of column from left table or NA if only from right table
    #  y - name of column from right table or NA if only from left table
    list(alias = c(x_new, y_new), x = c(x_x, y_x), y = c(x_y, y_y))
}

add_suffixes <- function(x, y, suffix)
{
    if (identical(suffix, "")) return(x)

    out <- rep_len(na_chr, length(x))
    for (i in seq_along(x))
    {
        nm <- x[[i]]
        while (nm %in% y || nm %in% out)
            nm <- paste0(nm, suffix)

        out[[i]] <- nm
    }
    out
}

#' Look up the applicable grouping variables for an operation
#' based on the data source and preceding sequence of operations
#' @param op An operation instance
#' @export
op_grps <- function(op) UseMethod("op_grps")

#' @export
op_grps.op_base <- function(op) character()

#' @export
op_grps.op_group_by <- function(op)
{
    if (isTRUE(op$args$add))
        union(op_grps(op$x), names(op$dots))
    else
        names(op$dots)
}

#' @export
op_grps.op_ungroup <- function(op)
{
    character()
}

#' @export
op_grps.op_summarise <- function(op)
{
    grps <- op_grps(op$x)
}

#' @export
op_grps.op_rename <- function(op)
{
    names(tidyselect::vars_rename(op_grps(op$x), !!! op$dots, .strict = FALSE))
}

#' @export
op_grps.op_single <- function(op)
{
    op_grps(op$x)
}
#' @export
op_grps.op_double <- function(op)
{
    op_grps(op$x)
}

#' @export
op_grps.tbl_kusto_abstract <- function(op)
{
    op_grps(op$ops)
}

#' @export
op_grps.tbl_df <- function(op)
{
    character()
}

#' Look up the applicable variables in scope for a given operation
#' based on the data source and preceding sequence of operations
#' @param op An operation instance
#' @export
op_vars <- function(op) UseMethod("op_vars")

#' @export
op_vars.op_base <- function(op)
{
    op$vars
}

#' @export
op_vars.op_select <- function(op)
{
    names(tidyselect::vars_select(op_vars(op$x), !!! op$dots, .include = op_grps(op$x)))
}

#' @export
op_vars.op_rename <- function(op)
{
    names(tidyselect::vars_rename(op_vars(op$x), !!! op$dots))
}

#' @export
op_vars.op_summarise <- function(op)
{
    c(op_grps(op$x), names(op$dots))
}

#' @export
op_vars.op_distinct <- function(op)
{
    if (is_empty(op$dots))
        op_vars(op$x)
    else
        unique(c(op_vars(op$x), names(op$dots)))
}

#' @export
op_vars.op_mutate <- function(op)
{
    unique(c(op_vars(op$x), names(op$dots)))
}

#' @export
op_vars.op_single <- function(op)
{
    op_vars(op$x)
}

#' @export
op_vars.op_join <- function(op)
{
    op$args$vars$alias
}

#' @export
op_vars.op_join <- function(op)
{
    op$args$vars$alias
}

#' @export
op_vars.op_semi_join <- function(op)
{
    op_vars(op$x)
}

#' @export
op_vars.op_set_op <- function(op)
{
    union(op_vars(op$x), op_vars(op$y))
}

#' @export
op_vars.tbl_kusto_abstract <- function(op)
{
    op_vars(op$ops)
}

#' @export
op_vars.tbl_df <- function(op)
{
    names(op)
}
Azure/AzureKusto documentation built on Oct. 16, 2023, 7:04 p.m.