R/tables.R

Defines functions collect.shard_table_handle collect.shard_dataset collect.shard_row_groups collect as_tibble.shard_dataset as_tibble.shard_row_groups iterate_row_groups table_finalize.shard_table_sink table_write.shard_table_sink .table_part_native_decode_rows .table_part_native_decode .table_part_native_encode .table_part_native_decode_string_rows .table_part_native_decode_string .table_part_native_encode_string .sink_atomic_write_rds .sink_write_manifest .schema_to_plain_list .sink_atomic_save_rds .sink_manifest_json_path .sink_manifest_rds_path .sink_part_path table_sink as_tibble.shard_table_handle as_tibble.shard_table_buffer as_tibble table_finalize.shard_table_buffer table_finalize table_write.shard_table_buffer .table_write_buffer_impl table_write .table_rows_resolve .table_rows_len .table_cast row_layout table_buffer schema .coltype_buffer_type .coltype_bytes factor_col string_col raw_col bool float64 int32 .coltype table_reset_diagnostics table_diagnostics

Documented in as_tibble as_tibble.shard_dataset as_tibble.shard_row_groups as_tibble.shard_table_buffer as_tibble.shard_table_handle bool collect collect.shard_dataset collect.shard_row_groups collect.shard_table_handle factor_col float64 int32 iterate_row_groups raw_col row_layout schema string_col table_buffer table_diagnostics table_finalize table_finalize.shard_table_buffer table_finalize.shard_table_sink table_sink table_write table_write.shard_table_buffer table_write.shard_table_sink

# Schema-driven columnar table outputs.
#
# v1.0 scope: fixed-row table buffers backed by shard buffers, with strict
# validation and disjoint row-range writes.

# Per-process table write diagnostics (aggregated into shard_map via dispatch
# deltas, similar to views/buffers).
.table_diag_env <- new.env(parent = emptyenv())
.table_diag_env$writes <- 0L
.table_diag_env$rows <- 0L
.table_diag_env$bytes <- 0

#' Table Diagnostics
#'
#' Per-process counters for table writes (number of table_write calls, rows,
#' and bytes written). shard_map uses deltas of these counters to produce
#' run-level diagnostics in copy_report().
#'
#' @return A list with `writes`, `rows`, and `bytes`.
#' @export
table_diagnostics <- function() {
  list(
    writes = .table_diag_env$writes,
    rows = .table_diag_env$rows,
    bytes = .table_diag_env$bytes
  )
}

table_reset_diagnostics <- function() {
  .table_diag_env$writes <- 0L
  .table_diag_env$rows <- 0L
  .table_diag_env$bytes <- 0
  invisible(NULL)
}

.coltype <- function(kind, ...) {
  structure(c(list(kind = kind), list(...)), class = "shard_coltype")
}

#' Column Types
#'
#' Type constructors for schema-driven table outputs.
#'
#' @return A `shard_coltype` object.
#' @name coltypes
NULL

#' @rdname coltypes
#' @export
int32 <- function() .coltype("int32")

#' @rdname coltypes
#' @export
float64 <- function() .coltype("float64")

#' @rdname coltypes
#' @export
bool <- function() .coltype("bool")

#' @rdname coltypes
#' @export
raw_col <- function() .coltype("raw")

#' @rdname coltypes
#' @export
string_col <- function() .coltype("string")

#' Categorical column type
#'
#' Stores factors as int32 codes plus shared levels metadata.
#'
#' @param levels Character vector of allowed levels.
#' @return A `shard_coltype` object.
#' @export
factor_col <- function(levels) {
  if (!is.character(levels) || length(levels) < 1L || anyNA(levels)) {
    stop("levels must be a non-empty character vector without NA", call. = FALSE)
  }
  .coltype("factor", levels = unique(levels))
}

.coltype_bytes <- function(ct) {
  switch(ct$kind,
    "int32" = 4,
    "float64" = 8,
    "bool" = 1,   # logical payload size estimate for table bytes (not storage)
    "raw" = 1,
    "factor" = 4,
    "string" = NA_real_,
    NA_real_
  )
}

.coltype_buffer_type <- function(ct) {
  switch(ct$kind,
    "int32" = "integer",
    "float64" = "double",
    "bool" = "logical",
    "raw" = "raw",
    "factor" = "integer",
    "string" = stop("string columns are not supported in fixed table_buffer() (use table_sink())", call. = FALSE),
    stop("Unsupported column type: ", ct$kind, call. = FALSE)
  )
}

#' Define a table schema
#'
#' A schema is a named set of columns with explicit types. It is used to
#' allocate table buffers and validate writes.
#'
#' @param ... Named columns with type specs (e.g., \code{int32()}, \code{float64()}).
#' @return A \code{shard_schema} object.
#' @export
#' @examples
#' s <- schema(x = float64(), y = int32(), label = string_col())
#' s
schema <- function(...) {
  cols <- list(...)
  if (length(cols) == 0) stop("schema() requires at least one column", call. = FALSE)
  if (is.null(names(cols)) || any(names(cols) == "")) {
    stop("schema() requires named columns", call. = FALSE)
  }
  if (anyDuplicated(names(cols))) stop("schema() column names must be unique", call. = FALSE)

  bad <- vapply(cols, function(x) !inherits(x, "shard_coltype"), logical(1))
  if (any(bad)) {
    stop("schema() columns must be type specs (e.g. int32(), float64(), factor_col())", call. = FALSE)
  }

  structure(list(columns = cols), class = "shard_schema")
}

#' Allocate a fixed-row table buffer
#'
#' Allocates a columnar table output: one typed buffer per column, each of
#' length \code{nrow}. Intended for lock-free disjoint row-range writes in shard_map.
#'
#' @param schema A \code{shard_schema}.
#' @param nrow Total number of rows in the final table.
#' @param backing Backing type for buffers (\code{"auto"}, \code{"mmap"}, \code{"shm"}).
#' @return A \code{shard_table_buffer} object with one shared buffer per schema column.
#' @export
#' @examples
#' \donttest{
#' s <- schema(x = float64(), y = int32())
#' tb <- table_buffer(s, nrow = 100L)
#' }
table_buffer <- function(schema, nrow, backing = c("auto", "mmap", "shm")) {
  backing <- match.arg(backing)
  if (!inherits(schema, "shard_schema")) stop("schema must be a shard_schema", call. = FALSE)

  nrow <- as.integer(nrow)
  if (is.na(nrow) || nrow < 1L) stop("nrow must be a positive integer", call. = FALSE)

  bufs <- list()
  for (nm in names(schema$columns)) {
    ct <- schema$columns[[nm]]
    buf_type <- .coltype_buffer_type(ct)
    bufs[[nm]] <- buffer(buf_type, dim = nrow, init = NULL, backing = backing)
  }

  structure(
    list(schema = schema, nrow = nrow, backing = backing, columns = bufs),
    class = "shard_table_buffer"
  )
}

#' Row layout for fixed-row table outputs
#'
#' Computes disjoint row ranges for each shard via prefix-sum, enabling lock-free
#' writes where each shard writes to a unique region.
#'
#' @param shards A \code{shard_descriptor}.
#' @param rows_per_shard Either a scalar integer or a function(shard)->integer.
#' @return A named list mapping shard id (character) to an \code{idx_range(start, end)}.
#' @export
#' @examples
#' \donttest{
#' sh <- shards(100, block_size = 25)
#' layout <- row_layout(sh, rows_per_shard = 25L)
#' }
row_layout <- function(shards, rows_per_shard) {
  if (!inherits(shards, "shard_descriptor")) stop("shards must be a shard_descriptor", call. = FALSE)
  if (missing(rows_per_shard)) stop("rows_per_shard is required", call. = FALSE)

  sizes <- integer(shards$num_shards)
  if (is.function(rows_per_shard)) {
    for (i in seq_len(shards$num_shards)) {
      n <- rows_per_shard(shards$shards[[i]])
      n <- as.integer(n)
      if (is.na(n) || n < 0L) stop("rows_per_shard returned invalid size for shard ", i, call. = FALSE)
      sizes[i] <- n
    }
  } else {
    n <- as.integer(rows_per_shard)
    if (is.na(n) || n < 0L) stop("rows_per_shard must be >= 0", call. = FALSE)
    sizes[] <- n
  }

  layout <- vector("list", shards$num_shards)
  cur <- 1L
  for (i in seq_len(shards$num_shards)) {
    n <- sizes[i]
    if (n == 0L) {
      layout[[i]] <- NULL
      next
    }
    layout[[i]] <- idx_range(cur, cur + n - 1L)
    cur <- cur + n
  }
  names(layout) <- as.character(seq_len(shards$num_shards))
  layout
}

.table_cast <- function(ct, x) {
  switch(ct$kind,
    "int32" = {
      xi <- as.integer(x)
      xi
    },
    "float64" = as.double(x),
    "bool" = as.logical(x),
    "raw" = as.raw(x),
    "string" = as.character(x),
    "factor" = {
      lev <- ct$levels
      if (is.factor(x)) {
        # Ensure levels are compatible
        if (!setequal(levels(x), lev)) {
          stop("factor levels mismatch in write", call. = FALSE)
        }
        # Map to schema levels ordering
        match(as.character(x), lev)
      } else if (is.character(x)) {
        match(x, lev)
      } else if (is.integer(x) || is.numeric(x)) {
        as.integer(x)
      } else {
        stop("Unsupported factor column input type", call. = FALSE)
      }
    },
    stop("Unsupported column type: ", ct$kind, call. = FALSE)
  )
}

.table_rows_len <- function(rows) {
  if (is.null(rows)) return(0L)
  if (inherits(rows, "shard_idx_range")) return(as.integer(rows$end - rows$start + 1L))
  rows <- as.integer(rows)
  length(rows)
}

.table_rows_resolve <- function(rows) {
  if (is.null(rows)) return(NULL)
  if (inherits(rows, "shard_idx_range")) return(rows$start:rows$end)
  as.integer(rows)
}

#' Write tabular results into a table buffer or sink
#'
#' \code{table_write()} is the common write path for shard table outputs:
#' \itemize{
#'   \item For fixed-size outputs, write into a \code{shard_table_buffer} using a row selector.
#'   \item For variable-size outputs, write into a \code{shard_table_sink} using a shard id.
#' }
#'
#' @param target A \code{shard_table_buffer} or \code{shard_table_sink}.
#' @param rows_or_shard_id For buffers: row selector (idx_range or integer vector).
#'   For sinks: shard id (integer).
#' @param data A data.frame or named list matching the schema columns.
#' @param ... Reserved for future extensions.
#' @return \code{NULL} (invisibly).
#' @export
#' @examples
#' \donttest{
#' s <- schema(x = float64(), y = int32())
#' tb <- table_buffer(s, nrow = 10L)
#' table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5))
#' }
table_write <- function(target, rows_or_shard_id, data, ...) {
  UseMethod("table_write")
}

.table_write_buffer_impl <- function(target, rows, data) {
  if (!inherits(target, "shard_table_buffer")) stop("target must be a shard_table_buffer", call. = FALSE)
  if (missing(rows)) stop("rows is required", call. = FALSE)

  row_idx <- .table_rows_resolve(rows)
  n <- .table_rows_len(rows)
  if (n < 1L) return(invisible(NULL))
  if (anyNA(row_idx) || any(row_idx < 1L) || any(row_idx > target$nrow)) {
    stop("rows contain out-of-range indices", call. = FALSE)
  }

  if (is.data.frame(data)) {
    data <- as.list(data)
  }
  if (!is.list(data) || is.null(names(data))) {
    stop("data must be a data.frame or named list", call. = FALSE)
  }

  sch <- target$schema$columns
  if (!setequal(names(data), names(sch))) {
    missing_cols <- setdiff(names(sch), names(data))
    extra_cols <- setdiff(names(data), names(sch))
    msg <- "table_write() data columns must match schema"
    if (length(missing_cols)) msg <- paste0(msg, "; missing: ", paste(missing_cols, collapse = ", "))
    if (length(extra_cols)) msg <- paste0(msg, "; extra: ", paste(extra_cols, collapse = ", "))
    stop(msg, call. = FALSE)
  }

  # Write each column slice.
  bytes <- 0
  for (nm in names(sch)) {
    ct <- sch[[nm]]
    col <- data[[nm]]
    if (length(col) != n) stop("Column '", nm, "' length mismatch (expected ", n, ")", call. = FALSE)
    cast <- .table_cast(ct, col)
    target$columns[[nm]][row_idx] <- cast

    b <- .coltype_bytes(ct)
    if (is.finite(b)) bytes <- bytes + as.double(n) * as.double(b)
  }

  .table_diag_env$writes <- .table_diag_env$writes + 1L
  .table_diag_env$rows <- .table_diag_env$rows + n
  .table_diag_env$bytes <- .table_diag_env$bytes + bytes

  invisible(NULL)
}

#' Write into a table buffer
#'
#' @param target A `shard_table_buffer`.
#' @param rows_or_shard_id Row selector (idx_range or integer vector).
#' @param data A data.frame or named list matching the schema columns.
#' @param ... Reserved for future extensions.
#' @return NULL (invisibly).
#' @export
table_write.shard_table_buffer <- function(target, rows_or_shard_id, data, ...) {
  .table_write_buffer_impl(target, rows = rows_or_shard_id, data = data)
}

#' Finalize a table buffer or sink
#'
#' For a \code{shard_table_buffer}, this returns a lightweight in-memory handle (or a
#' materialized data.frame/tibble, depending on \code{materialize}).
#'
#' For a \code{shard_table_sink}, this returns a row-group handle referencing the
#' written partitions (or materializes them if requested).
#'
#' @param target A \code{shard_table_buffer} or \code{shard_table_sink}.
#' @param materialize \code{"never"}, \code{"auto"}, or \code{"always"}.
#' @param max_bytes For \code{"auto"}, materialize only if estimated bytes <= max_bytes.
#' @param ... Reserved for future extensions.
#' @return A \code{shard_table_handle}, \code{shard_row_groups}, or materialized
#'   data.frame/tibble depending on \code{target} type and \code{materialize}.
#' @export
#' @examples
#' \donttest{
#' s <- schema(x = float64(), y = int32())
#' tb <- table_buffer(s, nrow = 5L)
#' table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5))
#' handle <- table_finalize(tb)
#' }
table_finalize <- function(target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ...) {
  UseMethod("table_finalize")
}

#' Finalize a table buffer
#'
#' @param target A `shard_table_buffer`.
#' @param materialize `"never"`, `"auto"`, or `"always"`.
#' @param max_bytes For `"auto"`, materialize only if estimated bytes <= max_bytes.
#' @param ... Reserved for future extensions.
#' @return A `shard_table_handle` or a materialized data.frame/tibble.
#' @export
table_finalize.shard_table_buffer <- function(target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ...) {
  materialize <- match.arg(materialize)

  handle <- structure(
    list(schema = target$schema, nrow = target$nrow, columns = target$columns),
    class = "shard_table_handle"
  )

  est <- 0
  for (nm in names(target$schema$columns)) {
    b <- .coltype_bytes(target$schema$columns[[nm]])
    if (is.finite(b)) est <- est + as.double(target$nrow) * as.double(b)
  }

  do_mat <- switch(materialize,
    "never" = FALSE,
    "always" = TRUE,
    "auto" = isTRUE(est <= max_bytes)
  )

  if (!do_mat) return(handle)
  as_tibble(handle, max_bytes = max_bytes)
}

#' Materialize a shard table handle as a data.frame/tibble
#'
#' @param x A shard table object.
#' @param max_bytes Warn if estimated payload exceeds this threshold.
#' @param ... Reserved for future extensions.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
#' @examples
#' \donttest{
#' s <- schema(x = float64(), y = int32())
#' tb <- table_buffer(s, nrow = 5L)
#' table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5))
#' df <- as_tibble(tb)
#' }
as_tibble <- function(x, max_bytes = 256 * 1024^2, ...) {
  UseMethod("as_tibble")
}

#' Materialize a fixed table handle or buffer
#'
#' Converts a `shard_table_handle` to an in-memory data.frame (or tibble if the
#' tibble package is installed).
#'
#' @param x A `shard_table_handle` or `shard_table_buffer`.
#' @param max_bytes Warn if estimated payload exceeds this threshold.
#' @param ... Reserved for future extensions.
#' @return A data.frame (or tibble).
#' @export
as_tibble.shard_table_buffer <- function(x, max_bytes = 256 * 1024^2, ...) {
  as_tibble(structure(list(schema = x$schema, nrow = x$nrow, columns = x$columns),
                      class = "shard_table_handle"),
            max_bytes = max_bytes)
}

#' Materialize a table handle into a data.frame/tibble
#'
#' @param x A \code{shard_table_handle}.
#' @param max_bytes Warn if estimated payload exceeds this threshold.
#' @param ... Reserved for future extensions.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
as_tibble.shard_table_handle <- function(x, max_bytes = 256 * 1024^2, ...) {
  sch <- x$schema$columns
  n <- x$nrow

  est <- 0
  for (nm in names(sch)) {
    b <- .coltype_bytes(sch[[nm]])
    if (is.finite(b)) est <- est + as.double(n) * as.double(b)
  }
  if (is.finite(est) && est > max_bytes) {
    warning("Materializing a large table (", format_bytes(est), "): consider writing to disk in table_sink().",
            call. = FALSE)
  }

  out <- list()
  for (nm in names(sch)) {
    ct <- sch[[nm]]
    buf <- x$columns[[nm]]
    vec <- buf[]  # full read
    if (ct$kind == "factor") {
      lev <- ct$levels
      codes <- as.integer(vec)
      chr <- rep(NA_character_, length(codes))
      ok <- !is.na(codes) & codes >= 1L & codes <= length(lev)
      chr[ok] <- lev[codes[ok]]
      vec <- factor(chr, levels = lev)
    } else if (ct$kind == "bool") {
      vec <- as.logical(vec)
    }
    out[[nm]] <- vec
  }
  df <- as.data.frame(out, stringsAsFactors = FALSE)

  if (pkg_available("tibble")) {
    return(tibble::as_tibble(df))
  }
  df
}

#' Create a table sink for row-group or partitioned outputs
#'
#' A table sink supports variable-sized outputs without returning large
#' data.frames to the master. Each shard writes a separate row-group file.
#'
#' v1.1 implementation notes:
#' \itemize{
#'   \item Storage format is per-shard RDS (portable, CRAN-friendly).
#'   \item This guarantees bounded master memory during execution; final collection
#'     may still be large if you materialize.
#' }
#'
#' @param schema A \code{shard_schema}. If NULL, a schema-less sink is created (RDS
#'   format only). This is primarily intended for doShard/foreach compatibility
#'   where output schemas may not be known in advance.
#' @param mode \code{"row_groups"} (temp, managed) or \code{"partitioned"} (persistent path).
#' @param path Directory to write row-group files. If NULL, a temp dir is created.
#' @param format Storage format for partitions: \code{"rds"} (data.frame RDS),
#'   \code{"native"} (columnar encoding with string offsets+bytes), or \code{"auto"}
#'   (selects \code{"native"} if the schema contains \code{string_col()}; otherwise \code{"rds"}).
#' @return A \code{shard_table_sink} object.
#' @export
#' @examples
#' \donttest{
#' s <- schema(x = float64(), label = string_col())
#' sink <- table_sink(s, mode = "row_groups")
#' }
table_sink <- function(schema,
                       mode = c("row_groups", "partitioned"),
                       path = NULL,
                       format = c("auto", "rds", "native")) {
  mode <- match.arg(mode)
  format <- match.arg(format)
  if (!is.null(schema) && !inherits(schema, "shard_schema")) {
    stop("schema must be a shard_schema (or NULL for schema-less RDS sinks)", call. = FALSE)
  }

  # Schema-less sinks are allowed only for the simple RDS format, primarily for
  # doShard/foreach compatibility. Schema-driven validation/encoding requires a
  # shard_schema.
  if (is.null(schema)) {
    if (!identical(format, "rds")) {
      format <- "rds"
    }
  }

  if (identical(format, "auto")) {
    has_string <- any(vapply(schema$columns, function(ct) identical(ct$kind, "string"), logical(1)))
    format <- if (has_string) "native" else "rds"
  }

  if (is.null(path)) {
    path <- file.path(tempdir(), paste0("shard_table_sink_", unique_id()))
  }
  if (!dir.exists(path)) dir.create(path, recursive = TRUE, showWarnings = FALSE)

  structure(
    list(schema = schema, mode = mode, path = path, format = format),
    class = "shard_table_sink"
  )
}

.sink_part_path <- function(sink, shard_id) {
  file.path(sink$path, sprintf("part-%06d.rds", as.integer(shard_id)))
}

.sink_manifest_rds_path <- function(sink) {
  file.path(sink$path, "manifest.rds")
}

.sink_manifest_json_path <- function(sink) {
  file.path(sink$path, "manifest.json")
}

.sink_atomic_save_rds <- function(obj, path) {
  tmp <- paste0(path, ".tmp_", Sys.getpid(), "_", sample.int(1e9, 1))
  saveRDS(obj, tmp)
  ok <- file.rename(tmp, path)
  if (!isTRUE(ok)) {
    unlink(tmp)
    stop("Failed to finalize write", call. = FALSE)
  }
  invisible(NULL)
}

.schema_to_plain_list <- function(schema) {
  if (!inherits(schema, "shard_schema")) {
    stop("schema must be a shard_schema", call. = FALSE)
  }
  cols <- lapply(schema$columns, function(ct) {
    # shard_coltype objects are simple lists; strip class for JSON stability.
    out <- as.list(ct)
    class(out) <- NULL
    out
  })
  list(
    version = schema$version %||% 1L,
    schema_hash = schema$schema_hash %||% NULL,
    columns = cols
  )
}

.sink_write_manifest <- function(sink, handle) {
  man_rds <- list(
    version = 1L,
    created = format(Sys.time(), "%Y-%m-%dT%H:%M:%SZ", tz = "UTC"),
    mode = sink$mode,
    format = sink$format,
    schema = sink$schema,
    files = basename(handle$files %||% character(0))
  )

  .sink_atomic_save_rds(man_rds, .sink_manifest_rds_path(sink))

  # Optional JSON manifest (Suggests: jsonlite). Keep it behind requireNamespace
  # so shard core stays CRAN-friendly.
  if (requireNamespace("jsonlite", quietly = TRUE)) {
    man_json <- man_rds
    if (!is.null(sink$schema)) {
      man_json$schema <- .schema_to_plain_list(sink$schema)
    }
    tmp <- paste0(.sink_manifest_json_path(sink), ".tmp_", Sys.getpid(), "_", sample.int(1e9, 1))
    jsonlite::write_json(man_json, tmp, auto_unbox = TRUE, pretty = TRUE)
    ok <- file.rename(tmp, .sink_manifest_json_path(sink))
    if (!isTRUE(ok)) unlink(tmp)
  }

  invisible(NULL)
}

.sink_atomic_write_rds <- function(obj, path) {
  tmp <- paste0(path, ".tmp_", Sys.getpid(), "_", sample.int(1e9, 1))
  saveRDS(obj, tmp)
  ok <- file.rename(tmp, path)
  if (!isTRUE(ok)) {
    unlink(tmp)
    stop("Failed to finalize row-group write", call. = FALSE)
  }
  invisible(NULL)
}

.table_part_native_encode_string <- function(x) {
  x <- as.character(x)
  n <- length(x)
  is_na <- is.na(x)
  x0 <- x
  x0[is_na] <- ""

  # Ensure stable byte encoding.
  x0 <- enc2utf8(x0)

  lens <- vapply(x0, function(s) length(charToRaw(s)), integer(1))
  total <- sum(lens)
  if (total > .Machine$integer.max) {
    stop("String payload too large for native encoding", call. = FALSE)
  }

  bytes <- raw(total)
  offsets <- integer(n + 1L)
  pos <- 0L
  offsets[1] <- 0L

  for (i in seq_len(n)) {
    l <- lens[[i]]
    if (l > 0L) {
      b <- charToRaw(x0[[i]])
      bytes[(pos + 1L):(pos + l)] <- b
      pos <- pos + l
    }
    offsets[i + 1L] <- pos
  }

  structure(
    list(offsets = offsets, bytes = bytes, is_na = is_na),
    class = "shard_string_blob"
  )
}

.table_part_native_decode_string <- function(blob) {
  if (!inherits(blob, "shard_string_blob")) stop("blob must be a shard_string_blob", call. = FALSE)
  offsets <- blob$offsets
  bytes <- blob$bytes
  is_na <- blob$is_na
  n <- length(is_na)

  out <- character(n)
  for (i in seq_len(n)) {
    if (isTRUE(is_na[[i]])) {
      out[[i]] <- NA_character_
      next
    }
    s <- offsets[[i]]
    e <- offsets[[i + 1L]]
    if (e <= s) {
      out[[i]] <- ""
    } else {
      out[[i]] <- rawToChar(bytes[(s + 1L):e])
    }
  }
  out
}

.table_part_native_decode_string_rows <- function(blob, rows) {
  if (!inherits(blob, "shard_string_blob")) stop("blob must be a shard_string_blob", call. = FALSE)
  rows <- as.integer(rows)
  rows <- rows[!is.na(rows) & rows >= 1L & rows <= length(blob$is_na)]
  if (length(rows) == 0) return(character(0))

  offsets <- blob$offsets
  bytes <- blob$bytes
  is_na <- blob$is_na

  out <- character(length(rows))
  for (j in seq_along(rows)) {
    i <- rows[[j]]
    if (isTRUE(is_na[[i]])) {
      out[[j]] <- NA_character_
      next
    }
    s <- offsets[[i]]
    e <- offsets[[i + 1L]]
    if (e <= s) {
      out[[j]] <- ""
    } else {
      out[[j]] <- rawToChar(bytes[(s + 1L):e])
    }
  }
  out
}

.table_part_native_encode <- function(df, schema) {
  if (!is.data.frame(df)) stop("df must be a data.frame", call. = FALSE)
  if (!inherits(schema, "shard_schema")) stop("schema must be a shard_schema", call. = FALSE)

  cols <- schema$columns
  out_cols <- list()
  for (nm in names(cols)) {
    ct <- cols[[nm]]
    v <- df[[nm]]
    if (ct$kind == "string") {
      out_cols[[nm]] <- .table_part_native_encode_string(v)
    } else if (ct$kind == "factor") {
      out_cols[[nm]] <- as.integer(v)
    } else {
      out_cols[[nm]] <- v
    }
  }
  structure(
    list(encoding = "native1", nrow = nrow(df), columns = out_cols),
    class = "shard_table_part_native"
  )
}

.table_part_native_decode <- function(part, schema) {
  if (!inherits(part, "shard_table_part_native")) stop("part must be shard_table_part_native", call. = FALSE)
  if (!inherits(schema, "shard_schema")) stop("schema must be a shard_schema", call. = FALSE)

  cols <- schema$columns
  out <- list()
  for (nm in names(cols)) {
    ct <- cols[[nm]]
    v <- part$columns[[nm]]
    if (ct$kind == "string") {
      out[[nm]] <- .table_part_native_decode_string(v)
    } else if (ct$kind == "factor") {
      lev <- ct$levels
      codes <- as.integer(v)
      chr <- rep(NA_character_, length(codes))
      ok <- !is.na(codes) & codes >= 1L & codes <= length(lev)
      chr[ok] <- lev[codes[ok]]
      out[[nm]] <- factor(chr, levels = lev)
    } else if (ct$kind == "bool") {
      out[[nm]] <- as.logical(v)
    } else {
      out[[nm]] <- v
    }
  }
  as.data.frame(out, stringsAsFactors = FALSE)
}

.table_part_native_decode_rows <- function(part, schema, rows) {
  if (!inherits(part, "shard_table_part_native")) stop("part must be shard_table_part_native", call. = FALSE)
  if (!inherits(schema, "shard_schema")) stop("schema must be a shard_schema", call. = FALSE)
  rows <- as.integer(rows)
  rows <- rows[!is.na(rows) & rows >= 1L & rows <= (part$nrow %||% 0L)]

  cols <- schema$columns
  out <- list()
  for (nm in names(cols)) {
    ct <- cols[[nm]]
    v <- part$columns[[nm]]
    if (ct$kind == "string") {
      out[[nm]] <- .table_part_native_decode_string_rows(v, rows)
    } else if (ct$kind == "factor") {
      lev <- ct$levels
      codes <- as.integer(v)[rows]
      chr <- rep(NA_character_, length(codes))
      ok <- !is.na(codes) & codes >= 1L & codes <= length(lev)
      chr[ok] <- lev[codes[ok]]
      out[[nm]] <- factor(chr, levels = lev)
    } else if (ct$kind == "bool") {
      out[[nm]] <- as.logical(v)[rows]
    } else {
      out[[nm]] <- v[rows]
    }
  }
  as.data.frame(out, stringsAsFactors = FALSE)
}

#' Write a shard's row-group output
#'
#' @param target A `shard_table_sink`.
#' @param rows_or_shard_id Integer shard id used to name the row-group file.
#' @param data A data.frame matching the sink schema.
#' @param ... Reserved for future extensions.
#' @return NULL (invisibly).
#' @export
table_write.shard_table_sink <- function(target, rows_or_shard_id, data, ...) {
  sink <- target

  shard_id <- rows_or_shard_id
  if (missing(shard_id)) stop("shard_id is required for table_sink writes", call. = FALSE)
  if (!dir.exists(sink$path)) dir.create(sink$path, recursive = TRUE, showWarnings = FALSE)
  if (is.data.frame(data)) {
    data <- as.list(data)
    data <- as.data.frame(data, stringsAsFactors = FALSE)
  }
  if (!is.data.frame(data)) stop("data must be a data.frame", call. = FALSE)

  if (!is.null(sink$schema)) {
    sch <- sink$schema$columns
    if (!setequal(names(data), names(sch))) {
      missing_cols <- setdiff(names(sch), names(data))
      extra_cols <- setdiff(names(data), names(sch))
      msg <- "table_write(sink, ...) data columns must match schema"
      if (length(missing_cols)) msg <- paste0(msg, "; missing: ", paste(missing_cols, collapse = ", "))
      if (length(extra_cols)) msg <- paste0(msg, "; extra: ", paste(extra_cols, collapse = ", "))
      stop(msg, call. = FALSE)
    }

    # Cast + validate each column.
    out <- data
    for (nm in names(sch)) {
      ct <- sch[[nm]]
      col <- out[[nm]]
      if (ct$kind == "factor") {
        lev <- ct$levels
        if (is.factor(col)) {
          chr <- as.character(col)
        } else if (is.character(col)) {
          chr <- col
        } else if (is.integer(col) || is.numeric(col)) {
          codes <- as.integer(col)
          chr <- rep(NA_character_, length(codes))
          ok <- !is.na(codes) & codes >= 1L & codes <= length(lev)
          chr[ok] <- lev[codes[ok]]
          if (any(!ok & !is.na(codes))) {
            stop("Invalid factor code in column '", nm, "'", call. = FALSE)
          }
        } else {
          stop("Unsupported factor column input type", call. = FALSE)
        }
        bad <- !is.na(chr) & !(chr %in% lev)
        if (any(bad)) stop("factor levels mismatch in write", call. = FALSE)
        out[[nm]] <- factor(chr, levels = lev)
      } else {
        out[[nm]] <- .table_cast(ct, col)
      }
    }
  } else {
    # Schema-less sink: accept the data.frame as-is and persist as RDS.
    out <- data
  }

  p <- .sink_part_path(sink, shard_id)
  if (identical(sink$format, "native")) {
    part <- .table_part_native_encode(out, sink$schema)
    .sink_atomic_write_rds(part, p)
  } else {
    .sink_atomic_write_rds(out, p)
  }

  n <- nrow(out)
  sz <- tryCatch(file.info(p)$size, error = function(e) NA_real_)

  .table_diag_env$writes <- .table_diag_env$writes + 1L
  .table_diag_env$rows <- .table_diag_env$rows + as.integer(n)
  if (is.finite(sz)) .table_diag_env$bytes <- .table_diag_env$bytes + as.double(sz)

  invisible(NULL)
}

#' Finalize a sink
#'
#' @param target A `shard_table_sink`.
#' @param materialize `"never"`, `"auto"`, or `"always"`.
#' @param max_bytes For `"auto"`, materialize only if estimated bytes <= max_bytes.
#' @param ... Reserved for future extensions.
#' @return A `shard_row_groups` handle (or a materialized data.frame/tibble).
#' @export
table_finalize.shard_table_sink <- function(target, materialize = c("never", "auto", "always"), max_bytes = 256 * 1024^2, ...) {
  materialize <- match.arg(materialize)

  files <- list.files(target$path, pattern = "^part-[0-9]+\\.rds$", full.names = TRUE)
  files <- sort(files)

  cls <- if (identical(target$mode, "partitioned")) "shard_dataset" else "shard_row_groups"
  handle <- structure(
    list(schema = target$schema, path = target$path, files = files, format = target$format, mode = target$mode),
    class = cls
  )

  # Always emit a manifest so users can inspect/ship outputs without reading
  # partitions, and so future backends can build on this directory layout.
  .sink_write_manifest(target, handle)

  # Determine whether to materialize by file sizes (if available).
  total <- sum(vapply(files, function(f) as.double(file.info(f)$size %||% 0), numeric(1)), na.rm = TRUE)
  do_mat <- switch(materialize,
    "never" = FALSE,
    "always" = TRUE,
    "auto" = isTRUE(total <= max_bytes)
  )

  if (!do_mat) return(handle)
  as_tibble(handle, max_bytes = max_bytes)
}

#' Iterate row groups
#'
#' @param x A \code{shard_row_groups} handle.
#' @param decode Logical. If TRUE (default), native-encoded partitions are
#'   decoded to data.frames. If FALSE, native partitions are returned as their
#'   internal representation (advanced).
#' @return A zero-argument iterator function that returns the next data.frame on
#'   each call, or \code{NULL} when exhausted.
#' @export
#' @examples
#' \donttest{
#' s <- schema(x = float64())
#' sink <- table_sink(s, mode = "row_groups")
#' table_write(sink, 1L, data.frame(x = rnorm(5)))
#' rg <- table_finalize(sink)
#' it <- iterate_row_groups(rg)
#' chunk <- it()
#' }
iterate_row_groups <- function(x, decode = TRUE) {
  if (!(inherits(x, "shard_row_groups") || inherits(x, "shard_dataset"))) {
    stop("x must be a shard_row_groups or shard_dataset handle", call. = FALSE)
  }
  files <- x$files
  schema <- x$schema
  i <- 0L
  function() {
    i <<- i + 1L
    if (i > length(files)) return(NULL)
    obj <- readRDS(files[[i]])
    if (inherits(obj, "shard_table_part_native")) {
      if (isTRUE(decode)) return(.table_part_native_decode(obj, schema))
      return(obj)
    }
    obj
  }
}

# Extend as_tibble() to accept row-group handles.
#' Materialize a row-groups handle into a data.frame/tibble
#'
#' @param x A \code{shard_row_groups} handle.
#' @param max_bytes Accepted for API consistency; currently unused for row-groups.
#' @param ... Reserved for future extensions.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
as_tibble.shard_row_groups <- function(x, max_bytes = 256 * 1024^2, ...) {
  # max_bytes is accepted for API consistency; row-groups are size-variable and
  # are typically materialized explicitly by users.
  it <- iterate_row_groups(x)
  chunks <- list()
  repeat {
    d <- it()
    if (is.null(d)) break
    chunks[[length(chunks) + 1L]] <- d
  }
  if (length(chunks) == 0) {
    df <- data.frame()
    if (pkg_available("tibble")) return(tibble::as_tibble(df))
    return(df)
  }
  df <- do.call(rbind, chunks)
  if (pkg_available("tibble")) return(tibble::as_tibble(df))
  df
}

#' Materialize a dataset handle into a data.frame/tibble
#'
#' @param x A \code{shard_dataset} handle.
#' @param max_bytes Accepted for API consistency.
#' @param ... Reserved for future extensions.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
as_tibble.shard_dataset <- function(x, max_bytes = 256 * 1024^2, ...) {
  # A shard_dataset is currently a directory of per-shard RDS partitions plus a
  # manifest. Materialization is identical to row-groups for now.
  as_tibble(structure(unclass(x), class = "shard_row_groups"), max_bytes = max_bytes)
}

#' Collect a shard table into memory
#'
#' \code{collect()} is a convenience alias for \code{as_tibble()} for shard table outputs.
#'
#' @param x A shard table handle (\code{shard_row_groups}, \code{shard_dataset},
#'   or \code{shard_table_handle}).
#' @param ... Passed to \code{as_tibble()}.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
#' @examples
#' \donttest{
#' s <- schema(x = float64(), y = int32())
#' tb <- table_buffer(s, nrow = 5L)
#' table_write(tb, idx_range(1, 5), data.frame(x = rnorm(5), y = 1:5))
#' handle <- table_finalize(tb)
#' df <- collect(handle)
#' }
collect <- function(x, ...) {
  UseMethod("collect")
}

#' Collect a row-groups handle into memory
#'
#' @param x A \code{shard_row_groups} handle.
#' @param ... Passed to \code{as_tibble()}.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
collect.shard_row_groups <- function(x, ...) {
  as_tibble(x, ...)
}

#' Collect a dataset handle into memory
#'
#' @param x A \code{shard_dataset} handle.
#' @param ... Passed to \code{as_tibble()}.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
collect.shard_dataset <- function(x, ...) {
  as_tibble(x, ...)
}

#' Collect a table handle into memory
#'
#' @param x A \code{shard_table_handle}.
#' @param ... Passed to \code{as_tibble()}.
#' @return A data.frame (or tibble if the \code{tibble} package is installed).
#' @export
collect.shard_table_handle <- function(x, ...) {
  as_tibble(x, ...)
}

Try the shard package in your browser

Any scripts or data that you put into this service are public.

shard documentation built on April 3, 2026, 9:08 a.m.