R/ipc.R

Defines functions guess_zip_filename guess_connection_type check_stream_if_requested example_ipc_stream read_nanoarrow.connection read_nanoarrow.character read_nanoarrow.raw read_nanoarrow

Documented in example_ipc_stream read_nanoarrow

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

#' Read serialized streams of Arrow data
#'
#' Reads connections, file paths, URLs, or raw vectors of serialized Arrow
#' data. Arrow documentation typically refers to this format as "Arrow IPC",
#' since its origin was as a means to transmit tables between processes
#' (e.g., multiple R sessions). This format can also be written to and read
#' from files or URLs and is essentially a high performance equivalent of
#' a CSV file that does a better job maintaining types.
#'
#' The nanoarrow package does not currently have the ability to write serialized
#' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use
#' the equivalent writer from another Arrow implementation in Python, C++,
#' Rust, JavaScript, Julia, C#, and beyond.
#'
#' The media type of an Arrow stream is `application/vnd.apache.arrow.stream`
#' and the recommended file extension is `.arrows`.
#'
#' @param x A `raw()` vector, connection, or file path from which to read
#'   binary data. Common extensions indicating compression (.gz, .bz2, .zip)
#'   are automatically uncompressed.
#' @param lazy By default, `read_nanoarrow()` will read and discard a copy of
#'   the reader's schema to ensure that invalid streams are discovered as
#'   soon as possible. Use `lazy = TRUE` to defer this check until the reader
#'   is actually consumed.
#' @param ... Currently unused.
#'
#' @return A [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' @export
#'
#' @examples
#' as.data.frame(read_nanoarrow(example_ipc_stream()))
#'
read_nanoarrow <- function(x, ..., lazy = FALSE) {
  UseMethod("read_nanoarrow")
}

#' @export
read_nanoarrow.raw <- function(x, ..., lazy = FALSE) {
  buffer <- as_nanoarrow_buffer(x)
  reader <- .Call(nanoarrow_c_ipc_array_reader_buffer, buffer)
  check_stream_if_requested(reader, lazy)
}

#' @export
read_nanoarrow.character <- function(x, ..., lazy = FALSE) {
  if (length(x) != 1) {
    stop(sprintf("Can't interpret character(%d) as file path", length(x)))
  }

  con_type <- guess_connection_type(x)
  if (con_type == "unz") {
    con <- do.call(con_type, list(x, filename = guess_zip_filename(x)))
  } else {
    con <- do.call(con_type, list(x))
  }

  # Helps with error reporting when reading invalid files
  reader <- read_nanoarrow(con, lazy = TRUE)
  check_stream_if_requested(reader, lazy)
}

#' @export
read_nanoarrow.connection <- function(x, ..., lazy = FALSE) {
  if (!isOpen(x)) {
    # Unopened connections should be opened in binary mode
    open(x, "rb")

    stream <- tryCatch(
      .Call(nanoarrow_c_ipc_array_reader_connection, x),
      error = function(e) {
        close(x)
        stop(e)
      }
    )

    # Close the connection when the array stream is released
    stream_finalizer <- function() {
      close(x)
    }

    finalizer_env <- new.env(parent = baseenv())
    finalizer_env$x <- x
    environment(stream_finalizer) <- finalizer_env

    reader <- array_stream_set_finalizer(stream, stream_finalizer)
  } else {
    reader <- .Call(nanoarrow_c_ipc_array_reader_connection, x)
  }

  check_stream_if_requested(reader, lazy)
}

#' @rdname read_nanoarrow
#' @export
example_ipc_stream <- function() {
  # data.frame(some_col = c(1L, 2L, 3L)) as a serialized schema/batch
  schema <- as.raw(c(
    0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00,
    0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00,
    0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00,
    0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x84, 0xff,
    0xff, 0xff, 0x18, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00,
    0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x00, 0x00, 0x08, 0x00,
    0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x00, 0x00, 0x00, 0x00,
    0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x00, 0x18, 0x00,
    0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x14, 0x00,
    0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x14, 0x00, 0x00, 0x00, 0x70, 0x00,
    0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x00, 0x00,
    0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00,
    0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x00,
    0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c,
    0x75, 0x65, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x00,
    0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x65,
    0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
  ))

  batch <- as.raw(c(
    0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00,
    0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x10, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00,
    0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00,
    0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
    0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00,
    0x00, 0x00, 0x00, 0x00, 0x00, 0x00
  ))

  c(schema, batch)
}

check_stream_if_requested <- function(reader, lazy) {
  if (!lazy) {
    # Report error as coming from read_nanoarrow() always
    cnd_call <- sys.call(-1)
    tryCatch(
      reader$get_schema(),
      error = function(e) {
        reader$release()
        e$call <- cnd_call
        stop(e)
      }
    )
  }

  reader
}

guess_connection_type <- function(x) {
  is_url <- grepl("://", x)

  compressed_con <- switch(
    tools::file_ext(x),
    "gz" = "gzfile",
    "bz2" = "bzfile",
    "zip" = "unz"
  )

  if (is_url && !is.null(compressed_con)) {
    stop("Reading compressed streams from URLs is not supported")
  }

  if (is_url) {
    "url"
  } else if (is.null(compressed_con)) {
    "file"
  } else {
    compressed_con
  }
}

guess_zip_filename <- function(x) {
  files <- utils::unzip(x, list = TRUE)[[1]]
  if (length(files) != 1) {
    stop(
      sprintf(
        "Unzip only supported of archives with exactly one file (found %d)",
        length(files)
      )
    )
  }

  files
}

Try the nanoarrow package in your browser

Any scripts or data that you put into this service are public.

nanoarrow documentation built on June 22, 2024, 9:37 a.m.