R/array-stream.R

Defines functions `$.nanoarrow_array_stream` `[[.nanoarrow_array_stream` names.nanoarrow_array_stream length.nanoarrow_array_stream format.nanoarrow_array_stream print.nanoarrow_array_stream str.nanoarrow_array_stream as.vector.nanoarrow_array_stream as.data.frame.nanoarrow_array_stream infer_nanoarrow_schema.nanoarrow_array_stream as_nanoarrow_array_stream.data.frame as_nanoarrow_array_stream.default as_nanoarrow_array_stream.nanoarrow_array as_nanoarrow_array_stream.nanoarrow_array_stream as_nanoarrow_array_stream array_stream_set_finalizer basic_array_stream

Documented in array_stream_set_finalizer as_nanoarrow_array_stream basic_array_stream

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

#' Create ArrayStreams from batches
#'
#' @param batches A [list()] of [nanoarrow_array][as_nanoarrow_array] objects
#'   or objects that can be coerced via [as_nanoarrow_array()].
#' @param schema A [nanoarrow_schema][as_nanoarrow_schema] or `NULL` to guess
#'   based on the first schema.
#' @param validate Use `FALSE` to skip the validation step (i.e., if you
#'   know that the arrays are valid).
#'
#' @return An [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' @export
#'
#' @examples
#' (stream <- basic_array_stream(list(data.frame(a = 1, b = 2))))
#' as.data.frame(stream$get_next())
#' stream$get_next()
#'
basic_array_stream <- function(batches, schema = NULL, validate = TRUE) {
  # Error for everything except a bare list (e.g., so that calling with
  # a data.frame() does not unintentionally loop over columns)
  if (!identical(class(batches), "list")) {
    stop("`batches` must be an unclassed `list()`")
  }

  batches <- lapply(batches, as_nanoarrow_array, schema = schema)

  if (is.null(schema) && length(batches) > 0) {
    schema <- infer_nanoarrow_schema(batches[[1]])
  } else if (is.null(schema)) {
    stop("Can't infer schema from first batch if there are zero batches")
  }

  .Call(nanoarrow_c_basic_array_stream, batches, schema, validate)
}

#' Register an array stream finalizer
#'
#' In some cases, R functions that return a [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' may require that the scope of some other object outlive that of the array
#' stream. If there is a need for that object to be released deterministically
#' (e.g., to close open files), you can register a function to run after the
#' stream's release callback is invoked from the R thread. Note that this
#' finalizer will **not** be run if the stream's release callback is invoked
#' from a **non**-R thread. In this case, the finalizer and its chain of
#' environments will be garbage-collected when `nanoarrow:::preserved_empty()`
#' is run.
#'
#' @param array_stream A [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' @param finalizer A function that will be called with zero arguments.
#'
#' @return A newly allocated `array_stream` whose release callback will call
#'   the supplied finalizer.
#' @export
#'
#' @examples
#' stream <- array_stream_set_finalizer(
#'   basic_array_stream(list(1:5)),
#'   function() message("All done!")
#' )
#' stream$release()
#'
array_stream_set_finalizer <- function(array_stream, finalizer) {
  stopifnot(is.function(finalizer))

  prot <- new.env(parent = emptyenv())
  prot$array_stream_finalizer <- finalizer
  class(prot) <- "nanoarrow_array_stream_finalizer"

  nanoarrow_pointer_set_protected(array_stream, prot)
  out <- nanoarrow_allocate_array_stream()
  nanoarrow_pointer_export(array_stream, out)
  out
}

#' Convert an object to a nanoarrow array_stream
#'
#' In nanoarrow, an 'array stream' corresponds to the `struct ArrowArrayStream`
#' as defined in the Arrow C Stream interface. This object is used to represent
#' a stream of [arrays][as_nanoarrow_array] with a common
#' [schema][as_nanoarrow_schema]. This is similar to an
#' [arrow::RecordBatchReader] except it can be used to represent a stream of
#' any type (not just record batches). Note that a stream of record batches
#' and a stream of non-nullable struct arrays are represented identically.
#' Also note that array streams are mutable objects and are passed by
#' reference and not by value.
#'
#' @param x An object to convert to a array_stream
#' @param ... Passed to S3 methods
#' @inheritParams as_nanoarrow_array
#'
#' @return An object of class 'nanoarrow_array_stream'
#' @export
#'
#' @examples
#' (stream <- as_nanoarrow_array_stream(data.frame(x = 1:5)))
#' stream$get_schema()
#' stream$get_next()
#'
#' # The last batch is returned as NULL
#' stream$get_next()
#'
#' # Release the stream
#' stream$release()
#'
as_nanoarrow_array_stream <- function(x, ..., schema = NULL) {
  UseMethod("as_nanoarrow_array_stream")
}

#' @export
as_nanoarrow_array_stream.nanoarrow_array_stream <- function(x, ..., schema = NULL) {
  if (is.null(schema)) {
    return(x)
  }

  inferred_schema <- infer_nanoarrow_schema(x)
  if (nanoarrow_schema_identical(schema, inferred_schema)) {
    return(x)
  }

  NextMethod()
}

#' @export
as_nanoarrow_array_stream.nanoarrow_array <- function(x, ..., schema = NULL) {
  if (is.null(schema)) {
    return(basic_array_stream(list(x), validate = FALSE))
  }

  inferred_schema <- infer_nanoarrow_schema(x)
  if (nanoarrow_schema_identical(schema, inferred_schema)) {
    return(basic_array_stream(list(x), validate = FALSE))
  }

  as_nanoarrow_array_stream(
    as_nanoarrow_array_stream(x),
    schema = schema
  )
}

#' @export
as_nanoarrow_array_stream.default <- function(x, ..., schema = NULL) {
  assert_arrow_installed("default coerce to nanoarrow_array_stream")

  as_nanoarrow_array_stream(
    arrow::as_record_batch_reader(x, ..., schema = arrow::as_schema(schema)),
    schema = schema
  )
}

#' @export
as_nanoarrow_array_stream.data.frame <- function(x, ..., schema = NULL) {
  if (is.null(schema)) {
    schema <- infer_nanoarrow_schema(x)
  } else {
    schema <- as_nanoarrow_schema(schema)
  }

  x <- as_nanoarrow_array(x, schema = schema)
  basic_array_stream(list(x), schema = schema)
}

#' @export
infer_nanoarrow_schema.nanoarrow_array_stream <- function(x, ...) {
  x$get_schema()
}

#' @export
as.data.frame.nanoarrow_array_stream <- function(x, ...) {
  # Always release the input: we are always consuming the entire stream.
  # For more fine-grained behaviour on error, one can use
  # convert_array_stream()
  on.exit(x$release())

  to <- infer_nanoarrow_ptype(x$get_schema())
  if (!inherits(to, "data.frame")) {
    stop("Can't convert non-struct array stream to data.frame")
  }

  convert_array_stream(x, to)
}

#' @export
as.vector.nanoarrow_array_stream <- function(x, mode) {
  on.exit(x$release())
  convert_array_stream(x)
}

#' @importFrom utils str
#' @export
str.nanoarrow_array_stream <- function(object, ...) {
  cat(sprintf("%s\n", format(object)))

  if (nanoarrow_pointer_is_valid(object)) {
    # Use the str() of the list version but remove the first
    # line of the output ("List of 2")
    info <- list(
      get_schema = object$get_schema,
      get_next = object$get_next,
      release = object$release
    )
    raw_str_output <- utils::capture.output(str(info, ..., give.attr = FALSE))
    cat(paste0(raw_str_output[-1], collapse = "\n"))
    cat("\n")
  }

  invisible(object)
}

#' @export
print.nanoarrow_array_stream <- function(x, ...) {
  str(x, ...)
  invisible(x)
}

#' @export
format.nanoarrow_array_stream <- function(x, ...) {
  if (nanoarrow_pointer_is_valid(x)) {
    tryCatch(
      sprintf("<nanoarrow_array_stream %s>", nanoarrow_schema_formatted(x$get_schema())),
      error = function(...) "<nanoarrow_array_stream[<error calling get_schema()]>"
    )

  } else {
    "<nanoarrow_array_stream[invalid pointer]>"
  }
}

# This is the list()-like interface to nanoarrow_array_stream that allows $ and [[
# to make nice auto-complete when interacting in an IDE

#' @export
length.nanoarrow_array_stream <- function(x, ...) {
  3L
}

#' @export
names.nanoarrow_array_stream <- function(x, ...) {
  c("get_schema", "get_next", "release")
}

#' @export
`[[.nanoarrow_array_stream` <- function(x, i, ...) {
  force(x)
  if (identical(i, "get_schema") || isTRUE(i == 1L)) {
    function() .Call(nanoarrow_c_array_stream_get_schema, x)
  } else if (identical(i, "get_next") || isTRUE(i == 2L)) {
    function(schema = x$get_schema(), validate = TRUE) {
      array <- .Call(nanoarrow_c_array_stream_get_next, x)
      if (!nanoarrow_pointer_is_valid(array)) {
        return(NULL)
      }

      nanoarrow_array_set_schema(array, schema, validate = validate)
      array
    }
  } else if (identical(i, "release") || isTRUE(i == 3L)) {
    function() nanoarrow_pointer_release(x)
  } else {
    NULL
  }
}

#' @export
`$.nanoarrow_array_stream` <- function(x, i, ...) {
  x[[i]]
}

Try the nanoarrow package in your browser

Any scripts or data that you put into this service are public.

nanoarrow documentation built on June 22, 2024, 9:37 a.m.