Nothing
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# nolint start: cyclocomp_linter,
ExecPlan <- R6Class("ExecPlan",
inherit = ArrowObject,
public = list(
Scan = function(dataset) {
if (inherits(dataset, c("RecordBatchReader", "ArrowTabular"))) {
return(self$SourceNode(dataset))
} else if (inherits(dataset, "arrow_dplyr_query")) {
if (inherits(dataset$.data, c("RecordBatchReader", "ArrowTabular"))) {
# There's no predicate pushdown to do, so no need to deal with other
# arrow_dplyr_query attributes here. They'll be handled by other
# ExecNodes
return(self$SourceNode(dataset$.data))
}
# Else, we're scanning a Dataset, and we need to pull out the filter
# and projection (column selection) to push down into the scanner
filter <- dataset$filtered_rows
if (isTRUE(filter)) {
filter <- Expression$scalar(TRUE)
}
projection <- dataset$selected_columns
dataset <- dataset$.data
assert_is(dataset, "Dataset")
} else {
assert_is(dataset, "Dataset")
# Just a dataset, not a query, so there's no predicates to push down
# so set some defaults
filter <- Expression$scalar(TRUE)
projection <- make_field_refs(colnames)
}
out <- ExecNode_Scan(self, dataset, filter, projection)
# Hold onto the source data's schema so we can preserve schema metadata
# in the resulting Scan/Write
out$extras$source_schema <- dataset$schema
out
},
SourceNode = function(.data) {
if (inherits(.data, "RecordBatchReader")) {
out <- ExecNode_SourceNode(self, .data)
} else {
assert_is(.data, "ArrowTabular")
out <- ExecNode_TableSourceNode(self, as_arrow_table(.data))
}
# Hold onto the source data's schema so we can preserve schema metadata
# in the resulting Scan/Write
out$extras$source_schema <- .data$schema
out
},
Build = function(.data) {
# This method takes an arrow_dplyr_query and chains together the
# ExecNodes that they produce. It does not evaluate them--that is Run().
group_vars <- dplyr::group_vars(.data)
grouped <- length(group_vars) > 0
.data <- ensure_group_vars(.data)
.data <- ensure_arrange_vars(.data) # this sets .data$temp_columns
if (is_collapsed(.data)) {
# We have a nested query.
if (has_unordered_head(.data$.data)) {
# TODO(GH-34941): FetchNode should do non-deterministic fetch
# Instead, we need to evaluate the query up to here,
# and then do a new query for the rest.
# as_record_batch_reader() will build and run an ExecPlan and do head() on it
reader <- as_record_batch_reader(.data$.data)
on.exit(reader$.unsafe_delete())
node <- self$SourceNode(reader)
} else {
# Recurse
node <- self$Build(.data$.data)
}
} else {
node <- self$Scan(.data)
}
# ARROW-13498: Even though Scan takes the filter (if you have a Dataset),
# we have to do it again
if (inherits(.data$filtered_rows, "Expression")) {
node <- node$Filter(.data$filtered_rows)
}
if (!is.null(.data$aggregations)) {
# Project to include just the data required for each aggregation,
# plus group_by_vars (last)
# TODO: validate that none of names(aggregations) are the same as names(group_by_vars)
# dplyr does not error on this but the result it gives isn't great
projection <- summarize_projection(.data)
# skip projection if no grouping and all aggregate functions are nullary
if (length(projection)) {
node <- node$Project(projection)
}
if (grouped) {
# We need to prefix all of the aggregation function names with "hash_"
.data$aggregations <- lapply(.data$aggregations, function(x) {
x[["fun"]] <- paste0("hash_", x[["fun"]])
x
})
}
.data$aggregations <- imap(.data$aggregations, function(x, name) {
# Embed `name` and `targets` inside the aggregation objects
x[["name"]] <- name
x[["targets"]] <- aggregate_target_names(x$data, name)
x
})
node <- node$Aggregate(
options = .data$aggregations,
key_names = group_vars
)
} else {
# If any columns are derived, reordered, or renamed we need to Project
# If there are aggregations, the projection was already handled above.
# We have to project at least once to eliminate some junk columns
# that the ExecPlan adds:
# __fragment_index, __batch_index, __last_in_fragment
#
# $Project() will check whether we actually need to project, so that
# repeated projection of the same thing
# (as when we've done collapse() and not projected after) is avoided
projection <- c(.data$selected_columns, .data$temp_columns)
node <- node$Project(projection)
if (!is.null(.data$join)) {
right_node <- self$Build(.data$join$right_data)
node <- node$Join(
type = .data$join$type,
right_node = right_node,
by = .data$join$by,
left_output = .data$join$left_output,
right_output = .data$join$right_output,
left_suffix = .data$join$suffix[[1]],
right_suffix = .data$join$suffix[[2]]
)
}
if (!is.null(.data$union_all)) {
node <- node$Union(self$Build(.data$union_all$right_data))
}
}
# Apply sorting and head/tail
head_or_tail <- .data$head %||% .data$tail
if (length(.data$arrange_vars)) {
if (!is.null(.data$tail)) {
# Handle tail first: Reverse sort, take head
# TODO(GH-34942): FetchNode support for tail
node <- node$OrderBy(list(
names = names(.data$arrange_vars),
orders = as.integer(!.data$arrange_desc)
))
node <- node$Fetch(.data$tail)
}
# Apply sorting
node <- node$OrderBy(list(
names = names(.data$arrange_vars),
orders = as.integer(.data$arrange_desc)
))
if (length(.data$temp_columns)) {
# If we sorted on ad-hoc derived columns, Project to drop them
temp_schema <- node$schema
cols_to_keep <- setdiff(names(temp_schema), names(.data$temp_columns))
node <- node$Project(make_field_refs(cols_to_keep))
}
if (!is.null(.data$head)) {
# Take the head now
node <- node$Fetch(.data$head)
}
} else if (!is.null(head_or_tail)) {
# Unsorted head/tail
# Handle a couple of special cases here:
if (node$has_ordered_batches()) {
# Data that has order, even implicit order from an in-memory table, is supported
# in FetchNode
if (!is.null(.data$head)) {
node <- node$Fetch(.data$head)
} else {
# TODO(GH-34942): FetchNode support for tail
# FetchNode currently doesn't support tail, but it has limit + offset
# So if we know how many rows the query will result in, we can offset
data_without_tail <- .data
data_without_tail$tail <- NULL
row_count <- nrow(data_without_tail)
if (!is.na(row_count)) {
node <- node$Fetch(.data$tail, offset = row_count - .data$tail)
} else {
# Workaround: non-deterministic tail
node$extras$slice_size <- head_or_tail
}
}
} else {
# TODO(GH-34941): non-deterministic FetchNode
# Data has non-deterministic order, so head/tail means "just show me any N rows"
# FetchNode does not support non-deterministic scans, so we have to handle outside
node$extras$slice_size <- head_or_tail
}
}
node
},
Run = function(node) {
assert_is(node, "ExecNode")
out <- ExecPlan_run(
self,
node,
prepare_key_value_metadata(node$final_metadata())
)
if (!is.null(node$extras$slice_size)) {
# For non-deterministic scans, head/tail are
# essentially taking a random slice from somewhere in the dataset.
# And since the head() implementation is way more efficient than tail(),
# just use it to take the random slice
out <- head(out, node$extras$slice_size)
}
out
},
Write = function(node, ...) {
# TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
final_metadata <- prepare_key_value_metadata(node$final_metadata())
ExecPlan_Write(
self,
node,
node$schema$WithMetadata(final_metadata),
...
)
},
ToString = function() {
ExecPlan_ToString(self)
},
.unsafe_delete = function() {
ExecPlan_UnsafeDelete(self)
super$.unsafe_delete()
}
)
)
# nolint end.
ExecPlan$create <- function(use_threads = option_use_threads()) {
ExecPlan_create(use_threads)
}
ExecNode <- R6Class("ExecNode",
inherit = ArrowObject,
public = list(
extras = list(
# Workaround for non-deterministic head/tail
slice_size = NULL,
# `source_schema` is put here in Scan() so that at Run/Write, we can
# extract the relevant metadata and keep it in the result
source_schema = NULL
),
preserve_extras = function(new_node) {
new_node$extras <- self$extras
new_node
},
final_metadata = function() {
# Copy metadata from source schema and trim R column metadata to match
# which columns are included in the result
old_schema <- self$extras$source_schema
old_meta <- old_schema$metadata
old_meta$r <- get_r_metadata_from_old_schema(self$schema, old_schema)
old_meta
},
has_ordered_batches = function() ExecNode_has_ordered_batches(self),
Project = function(cols) {
if (length(cols)) {
assert_is_list_of(cols, "Expression")
if (needs_projection(cols, self$schema)) {
self$preserve_extras(ExecNode_Project(self, cols, names(cols)))
} else {
self
}
} else {
self$preserve_extras(ExecNode_Project(self, character(0), character(0)))
}
},
Filter = function(expr) {
assert_is(expr, "Expression")
self$preserve_extras(ExecNode_Filter(self, expr))
},
Aggregate = function(options, key_names) {
out <- self$preserve_extras(
ExecNode_Aggregate(self, options, key_names)
)
# dplyr drops top-level attributes when you call summarize()
out$extras$source_schema$metadata[["r"]]$attributes <- NULL
out
},
Join = function(type, right_node, by, left_output, right_output, left_suffix, right_suffix) {
self$preserve_extras(
ExecNode_Join(
self,
type,
right_node,
left_keys = names(by),
right_keys = by,
left_output = left_output,
right_output = right_output,
output_suffix_for_left = left_suffix,
output_suffix_for_right = right_suffix
)
)
},
Union = function(right_node) {
self$preserve_extras(ExecNode_Union(self, right_node))
},
Fetch = function(limit, offset = 0L) {
self$preserve_extras(
ExecNode_Fetch(self, offset, limit)
)
},
OrderBy = function(sorting) {
self$preserve_extras(
ExecNode_OrderBy(self, sorting)
)
}
),
active = list(
schema = function() ExecNode_output_schema(self)
)
)
ExecPlanReader <- R6Class("ExecPlanReader",
inherit = RecordBatchReader,
public = list(
batches = function() ExecPlanReader__batches(self),
read_table = function() Table__from_ExecPlanReader(self),
Plan = function() ExecPlanReader__Plan(self),
PlanStatus = function() ExecPlanReader__PlanStatus(self),
ToString = function() {
sprintf(
"<Status: %s>\n\n%s\n\nSee $Plan() for details.",
self$PlanStatus(),
super$ToString()
)
}
)
)
#' @export
head.ExecPlanReader <- function(x, n = 6L, ...) {
# We need to make sure that the head() of an ExecPlanReader
# is also an ExecPlanReader so that the evaluation takes place
# in a way that supports calls into R.
as_record_batch_reader(as_adq(RecordBatchReader__Head(x, n)))
}
do_exec_plan_substrait <- function(substrait_plan) {
if (is.string(substrait_plan)) {
substrait_plan <- substrait__internal__SubstraitFromJSON(substrait_plan)
} else if (is.raw(substrait_plan)) {
substrait_plan <- buffer(substrait_plan)
} else {
abort("`substrait_plan` must be a JSON string or raw() vector")
}
plan <- ExecPlan$create()
on.exit(plan$.unsafe_delete())
ExecPlan_run_substrait(plan, substrait_plan)
}
needs_projection <- function(projection, schema) {
# Check whether `projection` would do anything to data with the given `schema`
field_names <- set_names(map_chr(projection, ~ .$field_name), NULL)
# We need to apply `projection` if:
!all(nzchar(field_names)) || # Any of the Expressions are not FieldRefs
!identical(field_names, names(projection)) || # Any fields are renamed
!identical(field_names, names(schema)) # The fields are reordered
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.