R/record-batch.R

Defines functions as_record_batch.data.frame as_record_batch.arrow_dplyr_query as_record_batch.Table as_record_batch.RecordBatch as_record_batch cbind.RecordBatch cbind_check_length rbind.RecordBatch names.RecordBatch from_message create

Documented in as_record_batch as_record_batch.arrow_dplyr_query as_record_batch.data.frame as_record_batch.RecordBatch as_record_batch.Table

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

#' @include arrow-object.R
#' @include array.R
#' @title RecordBatch class
#' @description A record batch is a collection of equal-length arrays matching
#' a particular [Schema]. It is a table-like data structure that is semantically
#' a sequence of [fields][Field], each a contiguous Arrow [Array].
#' @usage NULL
#' @format NULL
#' @docType class
#'
#' @section S3 Methods and Usage:
#' Record batches are data-frame-like, and many methods you expect to work on
#' a `data.frame` are implemented for `RecordBatch`. This includes `[`, `[[`,
#' `$`, `names`, `dim`, `nrow`, `ncol`, `head`, and `tail`. You can also pull
#' the data from an Arrow record batch into R with `as.data.frame()`. See the
#' examples.
#'
#' A caveat about the `$` method: because `RecordBatch` is an `R6` object,
#' `$` is also used to access the object's methods (see below). Methods take
#' precedence over the table's columns. So, `batch$Slice` would return the
#' "Slice" method function even if there were a column in the table called
#' "Slice".
#'
#' @section R6 Methods:
#' In addition to the more R-friendly S3 methods, a `RecordBatch` object has
#' the following R6 methods that map onto the underlying C++ methods:
#'
#' - `$Equals(other)`: Returns `TRUE` if the `other` record batch is equal
#' - `$column(i)`: Extract an `Array` by integer position from the batch
#' - `$column_name(i)`: Get a column's name by integer position
#' - `$names()`: Get all column names (called by `names(batch)`)
#' - `$nbytes()`: Total number of bytes consumed by the elements of the record batch
#' - `$RenameColumns(value)`: Set all column names (called by `names(batch) <- value`)
#' - `$GetColumnByName(name)`: Extract an `Array` by string name
#' - `$RemoveColumn(i)`: Drops a column from the batch by integer position
#' - `$SelectColumns(indices)`: Return a new record batch with a selection of columns, expressed as 0-based integers.
#' - `$Slice(offset, length = NULL)`: Create a zero-copy view starting at the
#'    indicated integer offset and going for the given length, or to the end
#'    of the table if `NULL`, the default.
#' - `$Take(i)`: return an `RecordBatch` with rows at positions given by
#'    integers (R vector or Array Array) `i`.
#' - `$Filter(i, keep_na = TRUE)`: return an `RecordBatch` with rows at positions where logical
#'    vector (or Arrow boolean Array) `i` is `TRUE`.
#' - `$SortIndices(names, descending = FALSE)`: return an `Array` of integer row
#'    positions that can be used to rearrange the `RecordBatch` in ascending or
#'    descending order by the first named column, breaking ties with further named
#'    columns. `descending` can be a logical vector of length one or of the same
#'    length as `names`.
#' - `$serialize()`: Returns a raw vector suitable for interprocess communication
#' - `$cast(target_schema, safe = TRUE, options = cast_options(safe))`: Alter
#'    the schema of the record batch.
#'
#' There are also some active bindings
#' - `$num_columns`
#' - `$num_rows`
#' - `$schema`
#' - `$metadata`: Returns the key-value metadata of the `Schema` as a named list.
#'    Modify or replace by assigning in (`batch$metadata <- new_metadata`).
#'    All list elements are coerced to string. See `schema()` for more information.
#' - `$columns`: Returns a list of `Array`s
#' @rdname RecordBatch-class
#' @name RecordBatch
#' @export
RecordBatch <- R6Class("RecordBatch",
  inherit = ArrowTabular,
  public = list(
    column = function(i) RecordBatch__column(self, i),
    column_name = function(i) RecordBatch__column_name(self, i),
    names = function() RecordBatch__names(self),
    nbytes = function() RecordBatch__ReferencedBufferSize(self),
    RenameColumns = function(value) RecordBatch__RenameColumns(self, value),
    Equals = function(other, check_metadata = FALSE, ...) {
      inherits(other, "RecordBatch") && RecordBatch__Equals(self, other, isTRUE(check_metadata))
    },
    GetColumnByName = function(name) {
      assert_that(is.string(name))
      RecordBatch__GetColumnByName(self, name)
    },
    SelectColumns = function(indices) RecordBatch__SelectColumns(self, indices),
    AddColumn = function(i, new_field, value) {
      RecordBatch__AddColumn(self, i, new_field, value)
    },
    SetColumn = function(i, new_field, value) {
      RecordBatch__SetColumn(self, i, new_field, value)
    },
    RemoveColumn = function(i) RecordBatch__RemoveColumn(self, i),
    ReplaceSchemaMetadata = function(new) {
      RecordBatch__ReplaceSchemaMetadata(self, prepare_key_value_metadata(new))
    },
    Slice = function(offset, length = NULL) {
      if (is.null(length)) {
        RecordBatch__Slice1(self, offset)
      } else {
        RecordBatch__Slice2(self, offset, length)
      }
    },
    # Take, Filter, and SortIndices are methods on ArrowTabular
    serialize = function() ipc___SerializeRecordBatch__Raw(self),
    to_data_frame = function() {
      RecordBatch__to_dataframe(self, use_threads = option_use_threads())
    },
    cast = function(target_schema, safe = TRUE, ..., options = cast_options(safe, ...)) {
      assert_is(target_schema, "Schema")
      assert_that(identical(self$schema$names, target_schema$names), msg = "incompatible schemas")
      RecordBatch__cast(self, target_schema, options)
    },
    export_to_c = function(array_ptr, schema_ptr) {
      ExportRecordBatch(self, array_ptr, schema_ptr)
    }
  ),
  active = list(
    num_columns = function() RecordBatch__num_columns(self),
    num_rows = function() RecordBatch__num_rows(self),
    schema = function() RecordBatch__schema(self),
    columns = function() RecordBatch__columns(self)
  )
)

RecordBatch$create <- function(..., schema = NULL) {
  arrays <- list2(...)
  if (length(arrays) == 1 && inherits(arrays[[1]], c("raw", "Buffer", "InputStream", "Message"))) {
    return(RecordBatch$from_message(arrays[[1]], schema))
  }

  # Else, a list of arrays or data.frames
  # making sure there are always names
  if (is.null(names(arrays))) {
    names(arrays) <- rep_len("", length(arrays))
  }
  stopifnot(length(arrays) > 0)

  # If any arrays are length 1, recycle them
  arrays <- recycle_scalars(arrays)

  # TODO: should this also assert that they're all Arrays?
  RecordBatch__from_arrays(schema, arrays)
}

RecordBatch$from_message <- function(obj, schema) {
  # Message/Buffer readers, previously in read_record_batch()
  assert_is(schema, "Schema")
  if (inherits(obj, c("raw", "Buffer"))) {
    obj <- BufferReader$create(obj)
    on.exit(obj$close())
  }
  if (inherits(obj, "InputStream")) {
    ipc___ReadRecordBatch__InputStream__Schema(obj, schema)
  } else {
    ipc___ReadRecordBatch__Message__Schema(obj, schema)
  }
}
#' @include arrowExports.R
RecordBatch$import_from_c <- ImportRecordBatch

#' Create a RecordBatch
#'
#' @param ... A `data.frame` or a named set of Arrays or vectors. If given a
#' mixture of data.frames and vectors, the inputs will be autospliced together
#' (see examples). Alternatively, you can provide a single Arrow IPC
#' `InputStream`, `Message`, `Buffer`, or R `raw` object containing a `Buffer`.
#' @param schema a [Schema], or `NULL` (the default) to infer the schema from
#' the data in `...`. When providing an Arrow IPC buffer, `schema` is required.
#' @rdname record_batch
#' @examples
#' batch <- record_batch(name = rownames(mtcars), mtcars)
#' dim(batch)
#' dim(head(batch))
#' names(batch)
#' batch$mpg
#' batch[["cyl"]]
#' as.data.frame(batch[4:8, c("gear", "hp", "wt")])
#' @export
record_batch <- RecordBatch$create

#' @export
names.RecordBatch <- function(x) x$names()

#' @export
rbind.RecordBatch <- function(...) {
  abort("Use `Table$create()` to combine RecordBatches into a Table")
}

cbind_check_length <- function(inputs, call = caller_env()) {
  sizes <- map_dbl(inputs, NROW)
  ok_lengths <- sizes %in% c(head(sizes, 1), 1)
  if (!all(ok_lengths)) {
    first_bad_one <- which.min(ok_lengths)
    abort(
      c("Non-scalar inputs must have an equal number of rows.",
        i = sprintf("..1 has %d, ..%d has %d", sizes[[1]], first_bad_one, sizes[[first_bad_one]])
      ),
      call = call
    )
  }
}

#' @export
cbind.RecordBatch <- function(...) {
  call <- sys.call()
  inputs <- list(...)
  arg_names <- if (is.null(names(inputs))) {
    rep("", length(inputs))
  } else {
    names(inputs)
  }

  cbind_check_length(inputs, call)

  columns <- flatten(map(seq_along(inputs), function(i) {
    input <- inputs[[i]]
    name <- arg_names[i]

    if (inherits(input, "RecordBatch")) {
      set_names(input$columns, names(input))
    } else if (inherits(input, "data.frame")) {
      as.list(input)
    } else if (inherits(input, "Table") || inherits(input, "ChunkedArray")) {
      abort("Cannot cbind a RecordBatch with Tables or ChunkedArrays",
        i = "Hint: consider converting the RecordBatch into a Table first"
      )
    } else {
      if (name == "") {
        abort("Vector and array arguments must have names",
          i = sprintf("Argument ..%d is missing a name", i)
        )
      }
      list2("{name}" := input)
    }
  }))

  RecordBatch$create(!!!columns)
}

#' Convert an object to an Arrow RecordBatch
#'
#' Whereas [record_batch()] constructs a [RecordBatch] from one or more columns,
#' `as_record_batch()` converts a single object to an Arrow [RecordBatch].
#'
#' @param x An object to convert to an Arrow RecordBatch
#' @param ... Passed to S3 methods
#' @inheritParams record_batch
#'
#' @return A [RecordBatch]
#' @export
#'
#' @examples
#' # use as_record_batch() for a single object
#' as_record_batch(data.frame(col1 = 1, col2 = "two"))
#'
#' # use record_batch() to create from columns
#' record_batch(col1 = 1, col2 = "two")
#'
as_record_batch <- function(x, ..., schema = NULL) {
  UseMethod("as_record_batch")
}

#' @rdname as_record_batch
#' @export
as_record_batch.RecordBatch <- function(x, ..., schema = NULL) {
  if (is.null(schema)) {
    x
  } else {
    x$cast(schema)
  }
}

#' @rdname as_record_batch
#' @export
as_record_batch.Table <- function(x, ..., schema = NULL) {
  if (x$num_columns == 0) {
    batch <- record_batch(data.frame())
    return(batch$Take(rep_len(0, x$num_rows)))
  }

  arrays_out <- lapply(x$columns, as_arrow_array)
  names(arrays_out) <- names(x)
  out <- RecordBatch$create(!!!arrays_out)
  if (!is.null(schema)) {
    out <- out$cast(schema)
  }

  out
}

#' @rdname as_record_batch
#' @export
as_record_batch.arrow_dplyr_query <- function(x, ...) {
  as_record_batch(compute.arrow_dplyr_query(x), ...)
}

#' @rdname as_record_batch
#' @export
as_record_batch.data.frame <- function(x, ..., schema = NULL) {
  RecordBatch$create(x, schema = schema)
}

Try the arrow package in your browser

Any scripts or data that you put into this service are public.

arrow documentation built on Nov. 25, 2023, 1:09 a.m.