Nothing
#' @title Zero-Copy Shared Objects
#' @name share
#' @description Create shared memory representations of R objects for efficient
#' parallel access without duplication.
#'
#' @details
#' The `share()` function is the primary high-level API for creating zero-copy
#' shared inputs. When you share an object:
#'
#' \enumerate{
#' \item The object is serialized into a shared memory segment
#' \item The segment is marked read-only (protected)
#' \item A lightweight handle is returned that can be passed to workers
#' \item Workers attach to the segment and deserialize on demand
#' }
#'
#' This approach eliminates per-worker duplication of large inputs. The data
#' exists once in shared memory, and all workers read from the same location.
#'
#' \strong{Immutability Contract}: Shared objects are immutable by design.
#' Any attempt to modify shared data in a worker will fail. This guarantees
#' deterministic behavior and prevents accidental copy-on-write.
#'
#' @seealso \code{\link{segment_create}} for low-level segment operations,
#' \code{\link{pool_create}} for worker pool management.
#'
#' @useDynLib shard, .registration = TRUE
NULL
# Internal helper: check for non-serializable objects
# Returns NULL on success, stops with error on failure
validate_serializable <- function(x, path = "x", seen = NULL) {
# Guard against cycles/aliases during validation (environments can be truly
# cyclic; lists can alias). We only need to ensure we don't recurse forever.
if (is.null(seen)) {
seen <- new.env(parent = emptyenv())
}
# Use address token for memoization; if this fails, fall back to best-effort.
id <- tryCatch(object_identity(x), error = function(e) NULL)
if (!is.null(id) && exists(id, envir = seen, inherits = FALSE)) {
return(invisible(NULL))
}
if (!is.null(id)) {
assign(id, TRUE, envir = seen)
}
# Check for functions (closures)
if (is.function(x)) {
stop("Cannot share functions (closures).\n",
" Found at: ", path, "\n",
" Hint: Extract the data you need and share that instead.",
call. = FALSE)
}
# Check for external pointers
if (typeof(x) == "externalptr") {
stop("Cannot share external pointers.\n",
" Found at: ", path, "\n",
" External pointers reference memory that cannot be serialized.",
call. = FALSE)
}
# Check for connections
if (inherits(x, "connection")) {
stop("Cannot share connection objects.\n",
" Found at: ", path, "\n",
" Connections cannot be serialized.",
call. = FALSE)
}
# Recursively check environments
if (is.environment(x)) {
# Skip base/global/empty environments (these serialize fine)
if (identical(x, baseenv()) ||
identical(x, globalenv()) ||
identical(x, emptyenv())) {
return(invisible(NULL))
}
# Check contents of user environments
for (nm in names(x)) {
validate_serializable(x[[nm]], paste0(path, "$", nm), seen = seen)
}
}
# Recursively check lists
if (is.list(x) && !is.data.frame(x)) {
nms <- names(x)
for (i in seq_along(x)) {
item_path <- if (!is.null(nms) && nzchar(nms[i])) {
paste0(path, "$", nms[i])
} else {
paste0(path, "[[", i, "]]")
}
validate_serializable(x[[i]], item_path, seen = seen)
}
}
# Recursively check S4 slots
if (isS4(x)) {
slot_names <- methods::slotNames(x)
for (sn in slot_names) {
validate_serializable(methods::slot(x, sn), paste0(path, "@", sn), seen = seen)
}
}
invisible(NULL)
}
# Helper: get object identity (memory address) for alias detection
object_identity <- function(x) {
# Prefer a stable, API-safe address token from C. This avoids using
# .Internal(inspect()), which is not part of R's public API.
tryCatch(
.Call("C_shard_obj_address", x, PACKAGE = "shard"),
error = function(e) paste0("obj_", sample.int(1e9, 1))
)
}
# Helper: check if object is a shareable atomic type
is_shareable_atomic <- function(x) {
# Shareable: numeric vectors (double, integer, complex), logical, raw
# Not shareable: character, lists, environments, functions, etc.
if (is.atomic(x) && !is.null(x)) {
mode <- typeof(x)
# Complex is intentionally excluded: shard ALTREP currently supports
# double/integer/logical/raw only.
return(mode %in% c("double", "integer", "logical", "raw"))
}
FALSE
}
#' Deep Sharing Hook for Custom Classes
#'
#' S3 generic that allows classes to customize deep sharing behavior.
#' Override this for your class to control which slots/elements are traversed,
#' force sharing of small objects, or transform objects before traversal.
#'
#' @param x The object being traversed during deep sharing.
#' @param ctx A context list containing:
#' \describe{
#' \item{path}{Current node path string (e.g., \code{"<root>$data@cache"})}
#' \item{class}{class(x) - the object's class vector}
#' \item{mode}{'strict' or 'balanced' - sharing mode}
#' \item{min_bytes}{Minimum size threshold for sharing}
#' \item{types}{Character vector of enabled types for sharing}
#' \item{deep}{Logical, always TRUE when hook is called}
#' }
#'
#' @return A list with optional fields:
#' \describe{
#' \item{skip_slots}{Character vector of S4 slot names to not traverse}
#' \item{skip_paths}{Character vector of paths to not traverse}
#' \item{force_share_paths}{Character vector of paths to force share (ignore min_bytes)}
#' \item{rewrite}{Function(x) -> x to transform object before traversal}
#' }
#' Return an empty list for default behavior (no customization).
#'
#' @export
#' @examples
#' \donttest{
#' shard_share_hook.MyModelClass <- function(x, ctx) {
#' list(
#' skip_slots = "cache",
#' force_share_paths = paste0(ctx$path, "@coefficients")
#' )
#' }
#'
#' shard_share_hook.LazyData <- function(x, ctx) {
#' list(
#' rewrite = function(obj) {
#' obj$data <- as.matrix(obj$data)
#' obj
#' }
#' )
#' }
#' }
shard_share_hook <- function(x, ctx) {
UseMethod("shard_share_hook")
}
#' @export
#' @rdname shard_share_hook
shard_share_hook.default <- function(x, ctx) {
# Default: no customization, traverse normally
list()
}
# Internal: build context object for hooks
build_hook_context <- function(x, path, mode, min_bytes, types, deep) {
list(
path = path,
class = class(x),
mode = mode,
min_bytes = min_bytes,
types = types,
deep = deep
)
}
# Internal: call hook safely with error handling based on mode
call_hook_safe <- function(x, ctx, mode) {
result <- tryCatch(
shard_share_hook(x, ctx),
error = function(e) {
if (mode == "strict") {
stop("Hook error in strict mode.\n",
" Path: ", ctx$path, "\n",
" Class: ", paste(ctx$class, collapse = ", "), "\n",
" Error: ", conditionMessage(e),
call. = FALSE)
}
# In balanced mode, return special marker indicating hook failure
structure(
list(hook_error = conditionMessage(e)),
class = "shard_hook_error"
)
}
)
# Validate hook return value
if (!is.list(result)) {
if (mode == "strict") {
stop("Hook must return a list.\n",
" Path: ", ctx$path, "\n",
" Class: ", paste(ctx$class, collapse = ", "), "\n",
" Got: ", typeof(result),
call. = FALSE)
}
return(structure(
list(hook_error = "Hook did not return a list"),
class = "shard_hook_error"
))
}
result
}
# Internal: check if path should be skipped based on hook result
should_skip_path <- function(current_path, hook_result) {
if (is.null(hook_result$skip_paths)) return(FALSE)
current_path %in% hook_result$skip_paths
}
# Internal: check if path should be force-shared based on hook result
should_force_share_path <- function(current_path, hook_result) {
if (is.null(hook_result$force_share_paths)) return(FALSE)
current_path %in% hook_result$force_share_paths
}
# Internal: traverse object for deep sharing with memoization and hooks
# Returns a structure with shared segments and alias information
share_deep_traverse <- function(x,
env,
path = "<root>",
depth = 0,
min_bytes = 64 * 1024 * 1024,
backing = "auto",
readonly = TRUE,
max_depth = Inf,
cycle_policy = "error",
mode = "balanced",
types = c("double", "integer", "logical", "raw", "complex"),
hook_result = NULL) {
env_has_self_cycle_ <- function(e) {
# R lists don't form true cycles, but environments can via bindings or
# parent.env(e) == e. We don't deep-traverse environments yet, but we
# should fail fast on obvious self-cycles to avoid pathological cases.
if (!is.environment(e)) return(FALSE)
if (identical(parent.env(e), e)) return(TRUE)
nms <- ls(envir = e, all.names = TRUE)
if (length(nms) == 0) return(FALSE)
vals <- mget(nms, envir = e, inherits = FALSE)
for (v in vals) {
if (is.environment(v) && identical(v, e)) return(TRUE)
}
FALSE
}
# Handle environment objects based on mode
if (is.environment(x) && !identical(x, baseenv()) &&
!identical(x, globalenv()) && !identical(x, emptyenv())) {
if (mode == "strict") {
stop("Cannot share: contains environment.\n",
" Found at: ", path, "\n",
" Hint: Use mode='balanced' to skip environment slots.",
call. = FALSE)
}
if (cycle_policy == "error" && env_has_self_cycle_(x)) {
stop("Cycle detected during deep sharing.\n",
" Path: ", path, "\n",
" Cause: environment self-reference.\n",
" Hint: Use cycle='skip' to skip cyclic references instead.",
call. = FALSE)
}
# In balanced mode, keep environments as-is
identity <- object_identity(x)
return(structure(
list(
type = "kept",
path = path,
node_id = env$next_id + 1,
identity = identity,
value = x
),
class = "shard_deep_kept"
))
}
# Get object identity for memoization and cycle detection
identity <- object_identity(x)
# Check for cycle (currently in traversal stack)
if (exists(identity, envir = env$in_progress, inherits = FALSE)) {
if (cycle_policy == "error") {
stop("Cycle detected during deep sharing.\n",
" Path: ", path, "\n",
" Object is self-referential.\n",
" Use cycle='skip' to skip cyclic references instead.",
call. = FALSE)
}
# cycle='skip': return a marker indicating cycle
return(structure(
list(
type = "cycle",
path = path,
identity = identity
),
class = "shard_deep_cycle"
))
}
# Check for alias (already seen and shared)
if (exists(identity, envir = env$seen, inherits = FALSE)) {
original <- get(identity, envir = env$seen, inherits = FALSE)
return(structure(
list(
type = "alias",
path = path,
identity = identity,
alias_of = original$path,
alias_node_id = original$node_id,
shared = original$shared,
value = original$value # For aliases to kept values
),
class = "shard_deep_alias"
))
}
# Mark as in-progress for cycle detection
original_identity <- identity # Save for cleanup
assign(identity, TRUE, envir = env$in_progress)
on.exit({
if (exists(original_identity, envir = env$in_progress, inherits = FALSE)) {
rm(list = original_identity, envir = env$in_progress)
}
}, add = TRUE)
# Assign a node ID for tracking
env$next_id <- env$next_id + 1
node_id <- env$next_id
# Call hook for this object
# Build context and call hook
ctx <- build_hook_context(x, path, mode, min_bytes, types, TRUE)
current_hook_result <- call_hook_safe(x, ctx, mode)
# Check for hook error
if (inherits(current_hook_result, "shard_hook_error")) {
# In balanced mode, continue with default behavior but track error
env$hook_errors <- c(env$hook_errors, list(list(
path = path,
error = current_hook_result$hook_error
)))
current_hook_result <- list() # Use empty hook result
}
# Store hook result in environment for child access
if (length(current_hook_result) > 0) {
env$active_hooks[[path]] <- current_hook_result
}
# Apply rewrite function if provided
if (!is.null(current_hook_result$rewrite) && is.function(current_hook_result$rewrite)) {
x <- tryCatch(
current_hook_result$rewrite(x),
error = function(e) {
if (mode == "strict") {
stop("Rewrite function error in strict mode.\n",
" Path: ", path, "\n",
" Error: ", conditionMessage(e),
call. = FALSE)
}
env$hook_errors <- c(env$hook_errors, list(list(
path = path,
error = paste("rewrite error:", conditionMessage(e))
)))
x # Return original on error in balanced mode
}
)
# Re-get identity after rewrite (object may have changed)
identity <- object_identity(x)
}
# Check force_share from parent hooks as well
force_share <- should_force_share_path(path, current_hook_result)
if (!force_share) {
# Check parent hook results for force_share_paths
for (hook_path in names(env$active_hooks)) {
if (should_force_share_path(path, env$active_hooks[[hook_path]])) {
force_share <- TRUE
break
}
}
}
# Check for registered adapter before default traversal
adapter <- shard_get_adapter(x)
if (!is.null(adapter) && depth < max_depth) {
# Use adapter to get children
adapter_children <- adapter$children(x)
if (!is.list(adapter_children)) {
stop("Adapter children() must return a list for class '",
adapter$class, "'.", call. = FALSE)
}
# Process each child from the adapter
nms <- names(adapter_children)
children <- vector("list", length(adapter_children))
names(children) <- nms
path_prefix <- if (!is.null(adapter$path_prefix)) {
adapter$path_prefix
} else {
paste0("@", adapter$class)
}
for (i in seq_along(adapter_children)) {
child_path <- if (!is.null(nms) && nzchar(nms[i])) {
paste0(path, path_prefix, "$", nms[i])
} else {
paste0(path, path_prefix, "[[", i, "]]")
}
children[[i]] <- share_deep_traverse(
adapter_children[[i]], env, child_path, depth + 1,
min_bytes, backing, readonly, max_depth, cycle_policy,
mode, types, current_hook_result
)
}
# Record in seen table
assign(identity, list(
path = path,
node_id = node_id,
shared = NULL
), envir = env$seen)
return(structure(
list(
type = "container",
container_type = "adapter",
adapter_class = adapter$class,
path = path,
node_id = node_id,
identity = identity,
children = children,
original_value = x,
adapter = adapter,
original_class = class(x),
original_names = nms
),
class = "shard_deep_container"
))
}
# Determine what to do with this object
should_share <- is_shareable_atomic(x) &&
(force_share || utils::object.size(x) >= min_bytes)
if (should_share) {
# Share this atomic object
shared <- share(x, backing = backing, readonly = readonly)
# Record in seen table for alias detection
assign(identity, list(
path = path,
node_id = node_id,
shared = shared
), envir = env$seen)
# Track the shared segment for cleanup
env$segments <- c(env$segments, list(shared))
return(structure(
list(
type = "shared",
path = path,
node_id = node_id,
identity = identity,
shared = shared,
forced = force_share
),
class = "shard_deep_node"
))
} else if (isS4(x) && depth < max_depth) {
# Process S4 object slots
all_slot_names <- methods::slotNames(x)
# Get skipped slots from hook
skipped_slots <- current_hook_result$skip_slots
children <- vector("list", length(all_slot_names))
names(children) <- all_slot_names
for (sn in all_slot_names) {
child_path <- paste0(path, "@", sn)
slot_val <- methods::slot(x, sn)
# Check if this slot should be skipped (either by skip_slots or skip_paths)
slot_skipped <- sn %in% skipped_slots ||
should_skip_path(child_path, current_hook_result)
if (slot_skipped) {
# Keep the slot value as-is (don't traverse)
children[[sn]] <- structure(
list(
type = "kept",
path = child_path,
node_id = env$next_id + 1,
identity = object_identity(slot_val),
value = slot_val,
skipped_by_hook = TRUE
),
class = "shard_deep_kept"
)
env$next_id <- env$next_id + 1
} else {
children[[sn]] <- share_deep_traverse(
slot_val, env, child_path, depth + 1,
min_bytes, backing, readonly, max_depth, cycle_policy,
mode, types, current_hook_result
)
}
}
# Record in seen table
assign(identity, list(
path = path,
node_id = node_id,
shared = NULL
), envir = env$seen)
return(structure(
list(
type = "container",
container_type = "S4",
path = path,
node_id = node_id,
identity = identity,
children = children,
original_class = class(x),
skipped_slots = skipped_slots
),
class = "shard_deep_container"
))
} else if (is.list(x) && !is.data.frame(x) && depth < max_depth) {
# Recursively process list elements
nms <- names(x)
children <- vector("list", length(x))
names(children) <- nms
for (i in seq_along(x)) {
child_path <- if (!is.null(nms) && nzchar(nms[i])) {
paste0(path, "$", nms[i])
} else {
paste0(path, "[[", i, "]]")
}
# Check if this path should be skipped
if (should_skip_path(child_path, current_hook_result)) {
children[[i]] <- structure(
list(
type = "kept",
path = child_path,
node_id = env$next_id + 1,
identity = object_identity(x[[i]]),
value = x[[i]],
skipped_by_hook = TRUE
),
class = "shard_deep_kept"
)
env$next_id <- env$next_id + 1
} else {
children[[i]] <- share_deep_traverse(
x[[i]], env, child_path, depth + 1,
min_bytes, backing, readonly, max_depth, cycle_policy,
mode, types, current_hook_result
)
}
}
# Record in seen table
assign(identity, list(
path = path,
node_id = node_id,
shared = NULL
), envir = env$seen)
return(structure(
list(
type = "container",
container_type = "list",
path = path,
node_id = node_id,
identity = identity,
children = children,
original_class = class(x),
original_names = nms
),
class = "shard_deep_container"
))
} else if (is.data.frame(x) && depth < max_depth) {
# Process data.frame columns
children <- vector("list", ncol(x))
names(children) <- names(x)
for (i in seq_along(x)) {
child_path <- paste0(path, "$", names(x)[i])
# Check if this path should be skipped
if (should_skip_path(child_path, current_hook_result)) {
children[[i]] <- structure(
list(
type = "kept",
path = child_path,
node_id = env$next_id + 1,
identity = object_identity(x[[i]]),
value = x[[i]],
skipped_by_hook = TRUE
),
class = "shard_deep_kept"
)
env$next_id <- env$next_id + 1
} else {
children[[i]] <- share_deep_traverse(
x[[i]], env, child_path, depth + 1,
min_bytes, backing, readonly, max_depth, cycle_policy,
mode, types, current_hook_result
)
}
}
# Record in seen table
assign(identity, list(
path = path,
node_id = node_id,
shared = NULL
), envir = env$seen)
return(structure(
list(
type = "container",
container_type = "data.frame",
path = path,
node_id = node_id,
identity = identity,
children = children,
original_class = class(x),
original_names = names(x),
original_nrow = nrow(x),
original_row_names = attr(x, "row.names")
),
class = "shard_deep_container"
))
} else if (isS4(x) && depth < max_depth) {
# S4 object: generic slot traversal (no adapter registered)
slot_names <- methods::slotNames(x)
if (length(slot_names) > 0) {
children <- vector("list", length(slot_names))
names(children) <- slot_names
for (i in seq_along(slot_names)) {
sn <- slot_names[i]
child_path <- paste0(path, "@", sn)
children[[i]] <- share_deep_traverse(
methods::slot(x, sn), env, child_path, depth + 1,
min_bytes, backing, readonly, max_depth, cycle_policy
)
}
# Record in seen table
assign(identity, list(
path = path,
node_id = node_id,
shared = NULL
), envir = env$seen)
return(structure(
list(
type = "container",
container_type = "s4",
path = path,
node_id = node_id,
identity = identity,
children = children,
original_value = x,
original_class = class(x),
original_names = slot_names
),
class = "shard_deep_container"
))
}
# S4 with no slots falls through to kept
}
# Keep as-is (small atomic, unshareable type, max depth reached, or S4 with no slots)
# Record in seen table (including value for alias reconstruction)
assign(identity, list(
path = path,
node_id = node_id,
shared = NULL,
value = x
), envir = env$seen)
return(structure(
list(
type = "kept",
path = path,
node_id = node_id,
identity = identity,
value = x
),
class = "shard_deep_kept"
))
}
# Internal: reconstruct object from deep shared structure
fetch_deep_reconstruct <- function(node) {
if (inherits(node, "shard_deep_node")) {
# Shared leaf - fetch and materialize so reconstruction yields ordinary
# R objects (important for S4 slot assignment and for equality checks
# against the original, non-shared objects).
return(materialize(fetch(node$shared)))
} else if (inherits(node, "shard_deep_alias")) {
# Alias - fetch from the original shared segment or return kept value
if (!is.null(node$shared)) {
return(materialize(fetch(node$shared)))
} else {
# Alias to a kept value - return the stored value
return(node$value)
}
} else if (inherits(node, "shard_deep_cycle")) {
# Cycle marker - return NULL or special marker
# This shouldn't normally be reached if cycle='error'
return(NULL)
} else if (inherits(node, "shard_deep_kept")) {
# Kept value - return as-is
return(node$value)
} else if (inherits(node, "shard_deep_container")) {
# Container - reconstruct recursively
children <- lapply(node$children, fetch_deep_reconstruct)
if (node$container_type == "adapter") {
# Use adapter's replace function to reconstruct
result <- node$adapter$replace(node$original_value, children)
} else if (node$container_type == "s4") {
# Reconstruct S4 object by setting slots
result <- node$original_value # Start with a copy
for (sn in names(children)) {
methods::slot(result, sn) <- children[[sn]]
}
} else if (node$container_type == "data.frame") {
# Reconstruct data.frame
result <- structure(
children,
class = node$original_class,
names = node$original_names,
row.names = node$original_row_names
)
} else if (node$container_type == "S4") {
# Reconstruct S4 object
# Create new instance of the class
result <- methods::new(node$original_class)
# Set each slot
for (sn in names(children)) {
methods::slot(result, sn) <- children[[sn]]
}
} else {
# Reconstruct list
result <- children
if (!is.null(node$original_names)) {
names(result) <- node$original_names
}
if (!identical(node$original_class, "list")) {
class(result) <- node$original_class
}
}
return(result)
} else {
# Unknown type - return as-is
return(node)
}
}
#' Share an R Object for Parallel Access
#'
#' Creates a shared memory representation of an R object. The object is
#' serialized once and can be accessed by multiple worker processes without
#' copying.
#'
#' @param x An R object to share. Supports vectors, matrices, arrays, lists,
#' data frames, and any object that can be serialized with \code{serialize()}.
#' @param backing Backing type: "auto" (default), "mmap", or "shm".
#' \itemize{
#' \item \code{"auto"}: Let the system choose the best option.
#' \item \code{"mmap"}: File-backed memory mapping. Most portable.
#' \item \code{"shm"}: POSIX shared memory or Windows named mapping.
#' }
#' @param readonly Logical. If TRUE (default), the segment is protected after
#' writing, making it read-only. Set to FALSE only if you need to modify
#' the shared data (advanced use case).
#' @param name Optional name for the shared object. If NULL (default), a unique
#' name is generated. Named shares can be opened by name in other processes.
#' @param deep Logical. If TRUE, recursively traverse lists and data.frames,
#' sharing individual components that meet the size threshold. When FALSE
#' (default), the entire object is serialized as one unit.
#' @param min_bytes Minimum size in bytes for an object to be shared when
#' deep=TRUE. Objects smaller than this threshold are kept in-place.
#' Default is 64MB (64 * 1024 * 1024).
#' @param cycle How to handle cyclic references when deep=TRUE. Either "error"
#' (default) to stop with an error, or "skip" to skip cyclic references.
#' @param mode Sharing mode when deep=TRUE. Either "balanced" (default) to
#' continue on hook errors and non-shareable types, or "strict" to error.
#'
#' @return A \code{shard_shared} object (when deep=FALSE) or
#' \code{shard_deep_shared} object (when deep=TRUE) containing:
#' \itemize{
#' \item \code{path}: The path or name of the shared segment
#' \item \code{backing}: The backing type used
#' \item \code{size}: Total size in bytes
#' \item \code{readonly}: Whether the segment is protected
#' \item \code{class_info}: Original class information
#' }
#'
#' @export
#' @examples
#' \donttest{
#' mat <- matrix(rnorm(1e4), nrow = 100)
#' shared_mat <- share(mat)
#' recovered <- fetch(shared_mat)
#' identical(mat, recovered)
#' close(shared_mat)
#' }
share <- function(x,
backing = c("auto", "mmap", "shm"),
readonly = TRUE,
name = NULL,
deep = FALSE,
min_bytes = 64 * 1024 * 1024,
cycle = c("error", "skip"),
mode = c("balanced", "strict")) {
backing <- match.arg(backing)
cycle <- match.arg(cycle)
mode <- match.arg(mode)
# Validate input is serializable
validate_serializable(x)
# Shareable types
shareable_types <- c("double", "integer", "logical", "raw", "complex")
# Deep sharing: traverse structure and share components individually
if (deep) {
# Create environment for memoization state
env <- new.env(parent = emptyenv())
env$seen <- new.env(parent = emptyenv()) # identity -> {path, node_id, shared}
env$in_progress <- new.env(parent = emptyenv()) # identity -> TRUE (cycle detection)
env$segments <- list() # All created segments for cleanup
env$next_id <- 0 # Node ID counter
env$hook_errors <- list() # Track hook errors in balanced mode
env$active_hooks <- list() # Active hook results by path
# Traverse and share
structure_tree <- share_deep_traverse(
x, env, "<root>", 0,
min_bytes, backing, readonly, Inf, cycle,
mode, shareable_types, NULL
)
# Count shared vs aliased
shared_count <- 0
alias_count <- 0
cycle_count <- 0
kept_count <- 0
total_shared_bytes <- 0
count_nodes <- function(node) {
if (inherits(node, "shard_deep_node")) {
shared_count <<- shared_count + 1
if (inherits(node$shared, "shard_shared")) {
total_shared_bytes <<- total_shared_bytes + node$shared$size
} else if (is_shared_vector(node$shared)) {
elem_size <- switch(typeof(node$shared),
"integer" = 4L,
"double" = 8L,
"logical" = 4L,
"raw" = 1L,
NA_integer_
)
if (!is.na(elem_size)) {
total_shared_bytes <<- total_shared_bytes + length(node$shared) * elem_size
}
}
} else if (inherits(node, "shard_deep_alias")) {
alias_count <<- alias_count + 1
} else if (inherits(node, "shard_deep_cycle")) {
cycle_count <<- cycle_count + 1
} else if (inherits(node, "shard_deep_kept")) {
kept_count <<- kept_count + 1
} else if (inherits(node, "shard_deep_container")) {
for (child in node$children) {
count_nodes(child)
}
}
}
count_nodes(structure_tree)
return(structure(
list(
tree = structure_tree,
segments = env$segments,
backing = backing,
readonly = readonly,
mode = mode,
summary = list(
shared_count = shared_count,
alias_count = alias_count,
cycle_count = cycle_count,
kept_count = kept_count,
total_shared_bytes = total_shared_bytes,
hook_error_count = length(env$hook_errors),
hook_errors = env$hook_errors
),
class_info = list(
type = if (is.data.frame(x)) "data.frame"
else if (is.list(x)) "list"
else if (isS4(x)) "S4"
else "other",
deep = TRUE
)
),
class = "shard_deep_shared"
))
}
# Fast path: shareable atomic vectors/matrices/arrays become ALTREP-backed
# shared vectors. This avoids per-worker serialization of large inputs.
if (is.atomic(x) && !is.null(x) &&
typeof(x) %in% c("double", "integer", "logical", "raw") &&
!is_shared_vector(x)) {
cow <- if (isTRUE(readonly)) "deny" else "allow"
# Build with cow='allow' so we can attach attributes, then lock down
# by setting shard_cow to the requested policy.
shared <- as_shared(x, readonly = readonly, backing = backing, cow = "allow")
# Preserve non-class attributes (dim, dimnames, names, tsp, etc).
attrs <- attributes(x)
x_class <- attr(x, "class")
attrs$class <- NULL
if (length(attrs)) {
for (nm in names(attrs)) {
attr(shared, nm) <- attrs[[nm]]
}
}
# Preserve any underlying class (e.g., Date) behind the wrapper class.
class(shared) <- unique(c("shard_shared_vector", x_class))
attr(shared, "shard_cow") <- cow
return(shared)
}
# Standard (non-deep) sharing: serialize entire object
# Serialize the object (with tryCatch for edge cases)
serialized <- tryCatch(
serialize(x, connection = NULL, xdr = FALSE),
error = function(e) {
stop("Cannot share object: serialization failed.\n",
"Reason: ", conditionMessage(e), "\n",
"Objects containing closures, external pointers, or ",
"certain connection types cannot be shared.",
call. = FALSE)
}
)
size <- length(serialized)
# Create segment with enough space for the serialized data
seg <- segment_create(size, backing = backing, path = name, readonly = FALSE)
# Write serialized data
segment_write(seg, serialized, offset = 0)
# Protect if requested
if (readonly) {
segment_protect(seg)
}
# Get segment info for the handle
info <- segment_info(seg)
# Extract class information for better deserialization hints
class_info <- if (is.matrix(x)) {
list(type = "matrix", dim = dim(x), mode = mode(x))
} else if (is.array(x)) {
list(type = "array", dim = dim(x), mode = mode(x))
} else if (is.data.frame(x)) {
list(type = "data.frame", nrow = nrow(x), ncol = ncol(x))
} else if (is.list(x)) {
list(type = "list", length = length(x))
} else {
list(type = "vector", length = length(x), mode = mode(x))
}
structure(
list(
segment = seg,
path = info$path,
backing = info$backing,
size = info$size,
readonly = readonly,
class_info = class_info
),
class = "shard_shared"
)
}
#' Fetch Data from a Shared Object
#'
#' Retrieves the R object from shared memory by deserializing it. This is the
#' primary way to access shared data in workers.
#'
#' @param x A \code{shard_shared} object.
#' @param ... Ignored.
#'
#' @return The original R object that was shared.
#'
#' @details
#' When called in the main process, this reads from the existing segment.
#' When called in a worker process, this opens the segment by path and
#' deserializes the data.
#'
#' The \code{fetch()} function is the primary way to access shared data.
#' It can also be called as \code{materialize()} for compatibility.
#'
#' @export
#' @examples
#' \donttest{
#' x <- 1:100
#' shared <- share(x)
#' recovered <- fetch(shared)
#' identical(x, recovered)
#' close(shared)
#' }
fetch <- function(x, ...) {
UseMethod("fetch")
}
#' @rdname fetch
#' @export
fetch.shard_shared <- function(x, ...) {
# Try using existing segment pointer (main process scenario)
# Wrap in tryCatch because pointer becomes invalid after serialization to workers
if (!is.null(x$segment) && inherits(x$segment, "shard_segment")) {
result <- tryCatch({
raw_data <- segment_read(x$segment, offset = 0, size = x$size)
unserialize(raw_data)
}, error = function(e) NULL)
if (!is.null(result)) {
return(result)
}
# Pointer was invalid (e.g., in worker process), fall through to reopen
}
# Open by path (worker process scenario)
if (is.null(x$path)) {
stop("Cannot fetch: no path available", call. = FALSE)
}
seg <- segment_open(x$path, backing = x$backing, readonly = TRUE)
on.exit(segment_close(seg, unlink = FALSE))
raw_data <- segment_read(seg, offset = 0, size = x$size)
unserialize(raw_data)
}
#' @rdname fetch
#' @export
fetch.shard_deep_shared <- function(x, ...) {
# Reconstruct the object from the shared structure tree
fetch_deep_reconstruct(x$tree)
}
#' @rdname fetch
#' @export
fetch.default <- function(x, ...) {
x
}
#' Materialize Shared Object
#'
#' Alias for \code{fetch()}. Retrieves the R object from shared memory.
#'
#' @param x A \code{shard_shared} object.
#' @return The original R object.
#' @export
#' @examples
#' \donttest{
#' shared <- share(1:100)
#' data <- materialize(shared)
#' close(shared)
#' }
materialize <- function(x) {
UseMethod("materialize")
}
#' @rdname materialize
#' @export
materialize.shard_shared <- function(x) {
fetch.shard_shared(x)
}
#' @rdname materialize
#' @export
materialize.default <- function(x) {
# Handle shard ALTREP vectors (from shared_vector/as_shared)
if (is_shared_vector(x)) {
out <- .Call("C_shard_altrep_materialize", x, PACKAGE = "shard")
# Restore non-shard attributes (dim, dimnames, names, and any original
# underlying class like Date/POSIXct behind the wrapper class).
attrs <- attributes(x)
if (!is.null(attrs)) {
attrs$shard_cow <- NULL
attrs$shard_readonly <- NULL
cls <- class(x)
cls2 <- setdiff(cls, "shard_shared_vector")
attrs$class <- if (length(cls2)) cls2 else NULL
attributes(out) <- attrs
}
return(out)
}
x
}
#' Close a Shared Object
#'
#' Releases the shared memory segment. After closing, the shared object can
#' no longer be accessed.
#'
#' @param con A \code{shard_shared} object.
#' @param ... Ignored.
#'
#' @return NULL (invisibly).
#' @export
#' @method close shard_shared
close.shard_shared <- function(con, ...) {
if (!is.null(con$segment)) {
segment_close(con$segment, unlink = TRUE)
}
invisible(NULL)
}
#' @rdname close.shard_shared
#' @export
#' @method close shard_shared_vector
close.shard_shared_vector <- function(con, ...) {
if (!is_shared_vector(con)) {
stop("con must be a shard ALTREP vector", call. = FALSE)
}
seg <- shared_segment(con)
segment_close(seg)
invisible(NULL)
}
#' @rdname close.shard_shared
#' @export
#' @method close shard_deep_shared
close.shard_deep_shared <- function(con, ...) {
# Close all shared segments
for (seg in con$segments) {
tryCatch(
close(seg),
error = function(e) NULL
)
}
invisible(NULL)
}
#' Check if Object is Shared
#'
#' @param x An object to check.
#' @return A logical scalar: \code{TRUE} if \code{x} is a shared object,
#' \code{FALSE} otherwise.
#' @export
#' @examples
#' is_shared(1:10)
#' \donttest{
#' shared <- share(1:10)
#' is_shared(shared)
#' close(shared)
#' }
is_shared <- function(x) {
inherits(x, c("shard_shared", "shard_deep_shared", "shard_shared_vector")) ||
(is.atomic(x) && is_shared_vector(x))
}
#' Get Information About a Shared Object
#'
#' @param x A \code{shard_shared} object.
#' @return A named list with fields \code{path}, \code{backing}, \code{size},
#' \code{readonly}, \code{class_info}, and \code{segment_info}.
#' @export
#' @examples
#' \donttest{
#' shared <- share(1:100)
#' shared_info(shared)
#' close(shared)
#' }
shared_info <- function(x) {
if (inherits(x, "shard_shared")) {
return(list(
path = x$path,
backing = x$backing,
size = x$size,
readonly = x$readonly,
class_info = x$class_info,
segment_info = if (!is.null(x$segment)) segment_info(x$segment) else NULL
))
}
if (is.atomic(x) && is_shared_vector(x)) {
seg <- shared_segment(x)
sinfo <- .Call("C_shard_segment_info", seg$ptr, PACKAGE = "shard")
class_info <- if (is.matrix(x)) {
list(type = "matrix", dim = dim(x), mode = typeof(x))
} else if (is.array(x) && length(dim(x)) > 1L) {
list(type = "array", dim = dim(x), mode = typeof(x))
} else {
list(type = "vector", length = length(x), mode = typeof(x))
}
return(list(
path = sinfo$path,
backing = sinfo$backing,
size = sinfo$size,
readonly = sinfo$readonly,
class_info = class_info,
segment_info = sinfo
))
}
stop("shared_info() expects a shard_shared object or shard shared vector.",
call. = FALSE)
}
#' Print a Shared Object
#'
#' @param x A \code{shard_shared} object.
#' @param ... Ignored.
#' @return The input \code{x}, invisibly.
#' @export
#' @examples
#' \donttest{
#' shared <- share(1:10)
#' print(shared)
#' close(shared)
#' }
print.shard_shared <- function(x, ...) {
cat("<shard_shared>\n")
cat(" Path:", x$path, "\n")
cat(" Size:", format(x$size, big.mark = ","), "bytes\n")
cat(" Backing:", x$backing, "\n")
cat(" Read-only:", x$readonly, "\n")
info <- x$class_info
if (info$type == "matrix") {
cat(" Original: matrix [", info$dim[1], " x ", info$dim[2], "], ",
info$mode, "\n", sep = "")
} else if (info$type == "array") {
cat(" Original: array [", paste(info$dim, collapse = " x "), "], ",
info$mode, "\n", sep = "")
} else if (info$type == "data.frame") {
cat(" Original: data.frame [", info$nrow, " x ", info$ncol, "]\n", sep = "")
} else if (info$type == "list") {
cat(" Original: list [", info$length, " elements]\n", sep = "")
} else {
cat(" Original: ", info$mode, " vector [", info$length, "]\n", sep = "")
}
invisible(x)
}
#' Print a Deep-Shared Object
#'
#' @param x A \code{shard_deep_shared} object.
#' @param ... Ignored.
#' @return The input \code{x}, invisibly.
#' @export
#' @examples
#' \donttest{
#' lst <- list(a = 1:10, b = 11:20)
#' shared <- share(lst, deep = TRUE, min_bytes = 1)
#' print(shared)
#' close(shared)
#' }
print.shard_deep_shared <- function(x, ...) {
cat("<shard_deep_shared>\n")
cat(" Type:", x$class_info$type, "\n")
cat(" Backing:", x$backing, "\n")
cat(" Read-only:", x$readonly, "\n")
cat("\n")
cat(" Summary:\n")
cat(" Shared segments:", x$summary$shared_count, "\n")
cat(" Aliased references:", x$summary$alias_count, "\n")
if (x$summary$cycle_count > 0) {
cat(" Cyclic references (skipped):", x$summary$cycle_count, "\n")
}
cat(" Kept in-place:", x$summary$kept_count, "\n")
cat(" Total shared bytes:",
format(x$summary$total_shared_bytes, big.mark = ","), "\n")
invisible(x)
}
#' Open an Existing Shared Object by Path
#'
#' Opens a shared object that was created by another process. This is useful
#' for workers that need to attach to shared data without having the original
#' \code{shard_shared} object.
#'
#' @param path Path to the shared segment.
#' @param backing Backing type: "mmap" or "shm".
#' @param size Size of the segment in bytes. If NULL, attempts to detect.
#'
#' @return A \code{shard_shared} object attached to the existing segment.
#' @export
#' @examples
#' \donttest{
#' shared <- share(1:50)
#' info <- shared_info(shared)
#' reopened <- share_open(info$path, backing = "mmap")
#' close(reopened)
#' close(shared)
#' }
share_open <- function(path, backing = c("mmap", "shm"), size = NULL) {
backing <- match.arg(backing)
seg <- segment_open(path, backing = backing, readonly = TRUE)
info <- segment_info(seg)
if (is.null(size)) {
size <- info$size
}
structure(
list(
segment = seg,
path = info$path,
backing = info$backing,
size = size,
readonly = TRUE,
class_info = list(type = "unknown")
),
class = "shard_shared"
)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.