Nothing
# Join key parsing: handles by = "col", by = c("a" = "b"), or NULL (natural)
parse_join_keys <- function(x, y, by) {
x_schema <- .Call(C_node_schema, x$.node)
y_schema <- .Call(C_node_schema, y$.node)
if (is.null(by)) {
# Natural join: common column names
common <- intersect(x_schema$name, y_schema$name)
if (length(common) == 0)
stop("no common columns found for natural join; specify 'by'")
message(sprintf("Joining by: %s", paste(common, collapse = ", ")))
return(list(left = common, right = common))
}
if (is.null(names(by))) {
# by = c("a", "b") -> same names on both sides
for (nm in by) {
if (!nm %in% x_schema$name) stop(sprintf("column '%s' not found in x", nm))
if (!nm %in% y_schema$name) stop(sprintf("column '%s' not found in y", nm))
}
return(list(left = by, right = by))
}
# by = c("a" = "b") -> left_key = "a", right_key = "b"
left_keys <- names(by)
right_keys <- unname(by)
# Handle unnamed entries (same name on both sides)
for (i in seq_along(left_keys)) {
if (left_keys[i] == "") left_keys[i] <- right_keys[i]
}
for (nm in left_keys) {
if (!nm %in% x_schema$name) stop(sprintf("column '%s' not found in x", nm))
}
for (nm in right_keys) {
if (!nm %in% y_schema$name) stop(sprintf("column '%s' not found in y", nm))
}
list(left = left_keys, right = right_keys)
}
#' Join two vectra tables
#'
#' @param x A `vectra_node` object (left table).
#' @param y A `vectra_node` object (right table).
#' @param by A character vector of column names to join by, or a named vector
#' like `c("a" = "b")`. `NULL` for natural join (common columns).
#' @param suffix A character vector of length 2 for disambiguating non-key
#' columns with the same name (default `c(".x", ".y")`).
#' @param ... Ignored.
#'
#' @return A `vectra_node` with the joined result.
#'
#' @details
#' All joins use a build-right, probe-left hash join. The entire right-side
#' table is materialized into a hash table; left-side batches stream through.
#' Memory cost is proportional to the right-side table size.
#'
#' NA keys never match (SQL NULL semantics). Key types are auto-coerced
#' following the `bool < int64 < double` hierarchy. Joining string against
#' numeric keys is an error.
#'
#' @examples
#' f1 <- tempfile(fileext = ".vtr")
#' f2 <- tempfile(fileext = ".vtr")
#' write_vtr(data.frame(id = c(1, 2, 3), x = c(10, 20, 30)), f1)
#' write_vtr(data.frame(id = c(1, 2, 4), y = c(100, 200, 400)), f2)
#' left_join(tbl(f1), tbl(f2), by = "id") |> collect()
#' unlink(c(f1, f2))
#'
#' @export
left_join <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
UseMethod("left_join")
}
# Internal: shared implementation for standard joins (left, inner, full)
join_impl <- function(x, y, by, suffix, type) {
keys <- parse_join_keys(x, y, by)
new_xptr <- .Call(C_join_node, x$.node, y$.node,
type, keys$left, keys$right, suffix[1], suffix[2])
structure(list(.node = new_xptr, .path = NULL), class = "vectra_node")
}
# Internal: shared implementation for filtering joins (semi, anti)
filter_join_impl <- function(x, y, by, type) {
keys <- parse_join_keys(x, y, by)
new_xptr <- .Call(C_join_node, x$.node, y$.node,
type, keys$left, keys$right, ".x", ".y")
structure(list(.node = new_xptr, .path = x$.path), class = "vectra_node")
}
#' @export
left_join.vectra_node <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
if (!inherits(y, "vectra_node"))
stop(sprintf("y must be a vectra_node, got %s", class(y)[1]))
if (!is.character(suffix) || length(suffix) != 2)
stop(sprintf("suffix must be character(2), got %s of length %d", class(suffix)[1], length(suffix)))
join_impl(x, y, by, suffix, "left")
}
#' @rdname left_join
#' @export
inner_join <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
UseMethod("inner_join")
}
#' @export
inner_join.vectra_node <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
if (!inherits(y, "vectra_node"))
stop(sprintf("y must be a vectra_node, got %s", class(y)[1]))
if (!is.character(suffix) || length(suffix) != 2)
stop(sprintf("suffix must be character(2), got %s of length %d", class(suffix)[1], length(suffix)))
join_impl(x, y, by, suffix, "inner")
}
#' @rdname left_join
#' @export
right_join <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
UseMethod("right_join")
}
#' @export
right_join.vectra_node <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
if (!inherits(y, "vectra_node"))
stop(sprintf("y must be a vectra_node, got %s", class(y)[1]))
if (!is.character(suffix) || length(suffix) != 2)
stop(sprintf("suffix must be character(2), got %s of length %d", class(suffix)[1], length(suffix)))
# right_join(x, y) = left_join(y, x) with swapped keys and reordered columns
keys <- parse_join_keys(x, y, by)
# Get schemas before C_join_node clears the external pointers
x_schema <- .Call(C_node_schema, x$.node)
y_schema <- .Call(C_node_schema, y$.node)
# Swap: build on left (x), probe with right (y)
new_xptr <- .Call(C_join_node, y$.node, x$.node,
"left", keys$right, keys$left, suffix[2], suffix[1])
result_node <- structure(list(.node = new_xptr, .path = NULL),
class = "vectra_node")
schema <- .Call(C_node_schema, result_node$.node)
# Current output: y_cols + x_non_key_cols
# Desired output: x_cols + y_non_key_cols (with key values from y for matched)
# This is complex to reorder at the node level. Use a project node.
cur_names <- schema$name
# x columns come after y columns in current output
# Build the desired column order
x_names <- x_schema$name
y_names <- y_schema$name
y_key_set <- keys$right
# Desired: all x columns, then y non-key columns
desired <- character(0)
expr_lists <- list()
# x key columns: use values from y side (key col in current output)
for (xn in x_names) {
ki <- match(xn, keys$left)
if (!is.na(ki)) {
# This x key maps to y key: find the y key col name in current output
yk <- keys$right[ki]
desired <- c(desired, xn)
if (xn != yk) {
expr_lists <- c(expr_lists, list(list(kind = "col_ref", name = yk)))
} else {
expr_lists <- c(expr_lists, list(NULL))
}
} else {
# Non-key x column: exists in current output (possibly suffixed)
match_name <- xn
if (!match_name %in% cur_names) {
# Try with suffix
match_name <- paste0(xn, suffix[1])
}
desired <- c(desired, xn)
if (match_name != xn) {
expr_lists <- c(expr_lists, list(list(kind = "col_ref", name = match_name)))
} else {
expr_lists <- c(expr_lists, list(NULL))
}
}
}
# y non-key columns
for (yn in y_names) {
if (yn %in% y_key_set) next
match_name <- yn
if (!match_name %in% cur_names) {
match_name <- paste0(yn, suffix[2])
}
desired <- c(desired, yn)
if (match_name != yn) {
expr_lists <- c(expr_lists, list(list(kind = "col_ref", name = match_name)))
} else {
expr_lists <- c(expr_lists, list(NULL))
}
}
new_xptr2 <- .Call(C_project_node, result_node$.node, desired, expr_lists)
structure(list(.node = new_xptr2, .path = NULL), class = "vectra_node")
}
#' @rdname left_join
#' @export
full_join <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
UseMethod("full_join")
}
#' @export
full_join.vectra_node <- function(x, y, by = NULL, suffix = c(".x", ".y"), ...) {
if (!inherits(y, "vectra_node"))
stop(sprintf("y must be a vectra_node, got %s", class(y)[1]))
if (!is.character(suffix) || length(suffix) != 2)
stop(sprintf("suffix must be character(2), got %s of length %d", class(suffix)[1], length(suffix)))
join_impl(x, y, by, suffix, "full")
}
#' @rdname left_join
#' @export
semi_join <- function(x, y, by = NULL, ...) {
UseMethod("semi_join")
}
#' @export
semi_join.vectra_node <- function(x, y, by = NULL, ...) {
if (!inherits(y, "vectra_node"))
stop(sprintf("y must be a vectra_node, got %s", class(y)[1]))
filter_join_impl(x, y, by, "semi")
}
#' @rdname left_join
#' @export
anti_join <- function(x, y, by = NULL, ...) {
UseMethod("anti_join")
}
#' @export
anti_join.vectra_node <- function(x, y, by = NULL, ...) {
if (!inherits(y, "vectra_node"))
stop(sprintf("y must be a vectra_node, got %s", class(y)[1]))
filter_join_impl(x, y, by, "anti")
}
#' Cross join two vectra tables
#'
#' Returns every combination of rows from `x` and `y` (Cartesian product).
#' Both tables are collected before joining.
#'
#' @param x A `vectra_node` object or data.frame.
#' @param y A `vectra_node` object or data.frame.
#' @param suffix Suffixes for disambiguating column names (default `c(".x", ".y")`).
#' @param ... Ignored.
#'
#' @return A data.frame with `nrow(x) * nrow(y)` rows.
#'
#' @examples
#' f1 <- tempfile(fileext = ".vtr")
#' f2 <- tempfile(fileext = ".vtr")
#' write_vtr(data.frame(a = 1:2), f1)
#' write_vtr(data.frame(b = c("x", "y", "z"), stringsAsFactors = FALSE), f2)
#' cross_join(tbl(f1), tbl(f2))
#' unlink(c(f1, f2))
#'
#' @export
cross_join <- function(x, y, suffix = c(".x", ".y"), ...) {
UseMethod("cross_join")
}
#' @export
cross_join.vectra_node <- function(x, y, suffix = c(".x", ".y"), ...) {
if (!inherits(y, "vectra_node") && !is.data.frame(y))
stop(sprintf("y must be a vectra_node or data.frame, got %s", class(y)[1]))
if (!is.character(suffix) || length(suffix) != 2)
stop(sprintf("suffix must be character(2), got %s of length %d", class(suffix)[1], length(suffix)))
df_x <- if (inherits(x, "vectra_node")) collect(x) else x
df_y <- if (inherits(y, "vectra_node")) collect(y) else y
nx <- nrow(df_x)
ny <- nrow(df_y)
# Expand: repeat each x row ny times, tile y nx times
idx_x <- rep(seq_len(nx), each = ny)
idx_y <- rep(seq_len(ny), times = nx)
result_x <- df_x[idx_x, , drop = FALSE]
result_y <- df_y[idx_y, , drop = FALSE]
# Handle name collisions
common <- intersect(names(result_x), names(result_y))
if (length(common) > 0) {
for (nm in common) {
names(result_x)[names(result_x) == nm] <- paste0(nm, suffix[1])
names(result_y)[names(result_y) == nm] <- paste0(nm, suffix[2])
}
}
result <- cbind(result_x, result_y)
rownames(result) <- NULL
result
}
#' Fuzzy join two vectra tables by string distance
#'
#' Joins two tables using approximate string matching on key columns.
#' Optionally blocks by a second column (e.g., genus) for performance —
#' only rows sharing the same blocking key are compared.
#'
#' @param x A `vectra_node` object (probe / query side).
#' @param y A `vectra_node` object (build / reference side).
#' @param by A named character vector of length 1: `c("probe_col" = "build_col")`.
#' The columns to compute string distance on.
#' @param method Character. Distance algorithm: `"dl"` (Damerau-Levenshtein,
#' default), `"levenshtein"`, or `"jw"` (Jaro-Winkler).
#' @param max_dist Numeric. Maximum normalized distance (0-1) to keep a match.
#' Default `0.2`.
#' @param block_by Optional named character vector of length 1:
#' `c("probe_col" = "build_col")`. Rows must match exactly on these columns
#' before distance is computed. Dramatically reduces comparisons.
#' @param n_threads Integer. Number of OpenMP threads for parallel distance
#' computation over partitions. Default `4L`.
#' @param suffix Character. Suffix appended to build-side column names that
#' collide with probe-side names. Default `".y"`.
#'
#' @return A `vectra_node` with all probe columns, all build columns (suffixed
#' on collision), and a `fuzzy_dist` column (double).
#'
#' @export
fuzzy_join <- function(x, y, by, method = "dl", max_dist = 0.2,
block_by = NULL, n_threads = 4L,
suffix = ".y") {
UseMethod("fuzzy_join")
}
#' @export
fuzzy_join.vectra_node <- function(x, y, by, method = "dl", max_dist = 0.2,
block_by = NULL, n_threads = 4L,
suffix = ".y") {
if (!inherits(y, "vectra_node"))
stop(sprintf("y must be a vectra_node, got %s", class(y)[1]))
if (!is.character(by) || length(by) != 1)
stop(sprintf("by must be a named character vector of length 1, got length %d", length(by)))
if (!is.numeric(max_dist) || length(max_dist) != 1 || max_dist < 0 || max_dist > 1)
stop(sprintf("max_dist must be a number between 0 and 1, got %s", deparse(max_dist)))
if (!is.numeric(n_threads) || length(n_threads) != 1 || n_threads < 1)
stop(sprintf("n_threads must be a positive integer, got %s", deparse(n_threads)))
method_int <- match(method, c("dl", "levenshtein", "jw")) - 1L
if (is.na(method_int)) stop("method must be 'dl', 'levenshtein', or 'jw'")
by_probe <- names(by)
by_build <- unname(by)
if (is.null(by_probe) || by_probe == "") by_probe <- by_build
block_probe <- if (!is.null(block_by)) names(block_by) else NULL
block_build <- if (!is.null(block_by)) unname(block_by) else NULL
if (!is.null(block_probe) && block_probe == "") block_probe <- block_build
new_xptr <- .Call(C_fuzzy_join_node,
x$.node, y$.node,
by_probe, by_build,
block_probe, block_build,
as.integer(method_int),
as.double(max_dist),
as.integer(n_threads),
suffix)
structure(list(.node = new_xptr, .path = NULL), class = "vectra_node")
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.