Nothing
#' @title Parallel Execution with shard_map
#' @description Core parallel execution engine with supervision, shared inputs, and output buffers.
#' @name shard_map
NULL
#' Parallel Shard Execution
#'
#' Executes a function over shards in parallel with worker supervision,
#' shared inputs, and explicit output buffers. This is the primary entry
#' point for shard's parallel execution model.
#'
#' @param shards A `shard_descriptor` from [shards()], or an integer N to
#' auto-generate shards.
#' @param fun Function to execute per shard. Receives the shard descriptor
#' as first argument, followed by borrowed inputs and outputs. You can also
#' select a registered kernel via `kernel=` instead of providing `fun=`.
#' @param borrow Named list of shared inputs. These are exported to workers
#' once and reused across shards. Treated as read-only by default.
#' @param out Named list of output buffers (from `buffer()`). Workers write
#' results directly to these buffers.
#' @param kernel Optional. Name of a registered kernel (see [list_kernels()]).
#' If provided, `fun` must be NULL.
#' @param scheduler_policy Optional list of scheduling hints (advanced). Currently:
#' - `max_huge_concurrency`: cap concurrent chunks whose kernel footprint is
#' classified as `"huge"` (see [register_kernel()]).
#' @param autotune Optional. Online autotuning for scalar-N sharding (advanced).
#' When `shards` is an integer `N`, shard_map can adjust shard block sizes over
#' time based on observed wall time and worker RSS.
#'
#' Accepted values:
#' - `NULL` (default): enable online autotuning for `shard_map(N, ...)`, off for
#' precomputed shard descriptors.
#' - `TRUE` / `"online"`: force online autotuning (only applies when `shards` is
#' an integer `N`).
#' - `FALSE` / `"none"`: disable autotuning.
#' - a list: `list(mode="online", max_rounds=..., probe_shards_per_worker=..., min_shard_time=...)`
#' @param dispatch_mode Dispatch mode (advanced). `"rpc_chunked"` is the default
#' supervised socket-based dispatcher. `"shm_queue"` is an opt-in fast mode
#' that uses a shared-memory task queue to reduce per-task overhead for tiny
#' tasks. In v1, `"shm_queue"` is only supported for `shard_map(N, ...)` with
#' `chunk_size=1` and is intended for out-buffer/sink workflows (results are
#' not gathered).
#' @param dispatch_opts Optional list of dispatch-mode specific knobs (advanced).
#' Currently:
#' - For `dispatch_mode="rpc_chunked"`:
#' - `auto_table`: logical. If TRUE, shard_map treats data.frame/tibble return
#' values as row-group outputs and writes them to a table sink
#' automatically (one partition per shard id). This avoids building a large
#' list of tibbles and calling bind_rows() on the master. Requires `out=`
#' to be empty (use explicit `out=list(sink=table_sink(...))` otherwise).
#' - `auto_table_materialize`: `"never"`, `"auto"`, or `"always"` (default `"auto"`).
#' - `auto_table_max_bytes`: numeric/integer. For `"auto"`, materialize only
#' if estimated output size <= this threshold (default 256MB).
#' - `auto_table_mode`: `"row_groups"` (default) or `"partitioned"`.
#' - `auto_table_path`: optional output directory (default tempdir()).
#' - `auto_table_format`: `"auto"`, `"rds"` (default), or `"native"`.
#' - `auto_table_schema`: optional `shard_schema` for validation/native encoding.
#' - For `dispatch_mode="shm_queue"`:
#' - `block_size`: integer. If provided, overrides the default heuristic for
#' contiguous shard block sizing.
#' - `queue_backing`: one of `"mmap"` or `"shm"` (default `"mmap"`).
#' - `error_log`: logical. If TRUE, workers write a bounded per-worker error
#' log to disk to aid debugging failed tasks (default FALSE).
#' - `error_log_max_lines`: integer. Maximum lines per worker in the error
#' log (default 100).
#' @param workers Integer. Number of worker processes. If NULL, uses existing
#' pool or creates one with `detectCores() - 1`.
#' @param chunk_size Integer. Shards to batch per worker dispatch (default 1).
#' Higher values reduce RPC overhead but may hurt load balancing.
#' @param profile Execution profile: `"default"`, `"memory"` (aggressive recycling),
#' or `"speed"` (minimal overhead). With `profile="speed"`, shard_map will
#' automatically enable `dispatch_mode="shm_queue"` when possible for
#' `shard_map(N, ...)` out-buffer workflows (scalar `N`, `chunk_size=1`),
#' unless `dispatch_mode` is explicitly specified.
#' @param mem_cap Memory cap per worker (e.g., "2GB"). Workers exceeding this
#' are recycled.
#' @param recycle Logical or numeric. If TRUE, recycle workers on RSS drift.
#' If numeric, specifies drift threshold (default 0.5 = 50% growth).
#' @param cow Copy-on-write policy for borrowed inputs: `"deny"` (error on mutation),
#' `"audit"` (detect and flag), or `"allow"` (permit with tracking).
#' @param seed Integer. RNG seed for reproducibility. If NULL, no seed is set.
#' @param diagnostics Logical. Collect detailed diagnostics (default TRUE).
#' @param packages Character vector. Additional packages to load in workers.
#' @param init_expr Expression to evaluate in each worker on startup.
#' @param timeout Numeric. Seconds to wait for each shard (default 3600).
#' @param max_retries Integer. Maximum retries per shard on failure (default 3).
#' @param health_check_interval Integer. Check worker health every N shards (default 10).
#'
#' @return A `shard_result` object containing:
#' - `results`: List of results from each shard (if fun returns values)
#' - `failures`: Any permanently failed shards
#' - `diagnostics`: Timing, memory, and worker statistics
#' - `pool_stats`: Pool-level statistics
#'
#' @export
#' @examples
#' \donttest{
#' blocks <- shards(1000, workers = 2)
#' result <- shard_map(blocks, function(shard) {
#' sum(shard$idx^2)
#' }, workers = 2)
#' pool_stop()
#' }
shard_map <- function(shards,
fun = NULL,
borrow = list(),
out = list(),
kernel = NULL,
scheduler_policy = NULL,
autotune = NULL,
dispatch_mode = c("rpc_chunked", "shm_queue"),
dispatch_opts = NULL,
workers = NULL,
chunk_size = 1L,
profile = c("default", "memory", "speed"),
mem_cap = "2GB",
recycle = TRUE,
cow = c("deny", "audit", "allow"),
seed = NULL,
diagnostics = TRUE,
packages = NULL,
init_expr = NULL,
timeout = 3600,
max_retries = 3L,
health_check_interval = 10L) {
profile <- match.arg(profile)
cow <- match.arg(cow)
dispatch_mode_user_provided <- !missing(dispatch_mode)
dispatch_mode <- if (dispatch_mode_user_provided) match.arg(dispatch_mode) else "rpc_chunked"
if (is.null(dispatch_opts)) dispatch_opts <- list()
if (!is.list(dispatch_opts)) stop("dispatch_opts must be NULL or a list", call. = FALSE)
kernel_meta <- NULL
if (!is.null(kernel)) {
kernel <- as.character(kernel)
km <- get_kernel(kernel)
if (is.null(km)) stop("Unknown kernel: ", kernel, call. = FALSE)
if (!is.null(fun)) stop("Provide either fun= or kernel=, not both", call. = FALSE)
fun <- km$impl
kernel_meta <- km
}
if (!is.function(fun)) stop("fun must be a function (or specify kernel=)", call. = FALSE)
# Start timing
start_time <- Sys.time()
diag <- if (diagnostics) {
list(
start_time = start_time,
health_checks = list(),
shard_times = list(),
worker_usage = list(),
kernel = kernel %||% NULL,
autotune = NULL,
dispatch_mode = dispatch_mode
)
} else {
NULL
}
# If the user passed an integer N, we can optionally do online autotuning
# while generating shards in phases (no up-front huge shard list required).
shards_is_scalar_n <- is.numeric(shards) && length(shards) == 1
n_items <- if (shards_is_scalar_n) as.integer(shards) else NA_integer_
# Determine worker count
if (is.null(workers)) {
pool <- pool_get()
if (!is.null(pool)) {
workers <- pool$n
} else {
dc <- parallel::detectCores()
if (is.na(dc) || dc < 1L) dc <- 1L
workers <- max(dc - 1L, 1L)
}
}
workers <- as.integer(workers)
if (is.na(workers) || workers < 1L) workers <- 1L
# Apply profile settings
profile_settings <- get_profile_settings(profile, mem_cap, recycle)
mem_cap <- profile_settings$mem_cap
rss_drift_threshold <- profile_settings$rss_drift_threshold
health_check_interval <- profile_settings$health_check_interval
# Convert integer to shard_descriptor if needed (after worker/profile resolution).
if (shards_is_scalar_n) {
if (is.na(n_items) || n_items < 1L) stop("shards must be >= 1", call. = FALSE)
} else {
if (!inherits(shards, "shard_descriptor")) {
stop("shards must be a shard_descriptor or integer", call. = FALSE)
}
}
# Validate inputs before expensive pool creation
borrow <- validate_borrow(borrow, cow)
auto_shared_names <- attr(borrow, "auto_shared")
if (length(auto_shared_names) > 0) {
on.exit({
for (nm in auto_shared_names) {
tryCatch(close(borrow[[nm]]), error = function(e) NULL)
}
}, add = TRUE)
}
out <- validate_out(out)
# Ensure pool exists with correct worker count
pool <- ensure_pool(
workers = workers,
mem_cap = mem_cap,
rss_drift_threshold = rss_drift_threshold,
packages = packages,
init_expr = init_expr
)
auto_table <- isTRUE(dispatch_opts$auto_table %||% FALSE)
auto_table_sink <- NULL
if (auto_table) {
if (length(out) > 0) {
stop("dispatch_opts$auto_table=TRUE requires out= to be empty; use out=list(sink=table_sink(...)) for explicit table outputs.", call. = FALSE)
}
auto_table_mode <- as.character(dispatch_opts$auto_table_mode %||% "row_groups")
if (!auto_table_mode %in% c("row_groups", "partitioned")) {
stop("dispatch_opts$auto_table_mode must be 'row_groups' or 'partitioned'", call. = FALSE)
}
auto_table_path <- dispatch_opts$auto_table_path %||% NULL
auto_table_format <- as.character(dispatch_opts$auto_table_format %||% "rds")
if (!auto_table_format %in% c("auto", "rds", "native")) {
stop("dispatch_opts$auto_table_format must be 'auto', 'rds', or 'native'", call. = FALSE)
}
# Schema-less by default to keep ceremony low; users can supply a shard_schema
# via dispatch_opts$auto_table_schema for native encoding + strict validation.
auto_table_schema <- dispatch_opts$auto_table_schema %||% NULL
if (!is.null(auto_table_schema) && !inherits(auto_table_schema, "shard_schema")) {
stop("dispatch_opts$auto_table_schema must be a shard_schema (or NULL)", call. = FALSE)
}
auto_table_sink <- table_sink(
schema = auto_table_schema,
mode = auto_table_mode,
path = auto_table_path,
format = auto_table_format
)
}
# Low-ceremony fast path: profile="speed" will automatically use shm_queue
# for scalar-N, chunk_size=1 out-buffer workflows unless dispatch_mode was
# explicitly set by the user.
if (!dispatch_mode_user_provided &&
identical(profile, "speed") &&
shards_is_scalar_n &&
as.integer(chunk_size) == 1L &&
length(out) > 0 &&
taskq_supported()) {
dispatch_mode <- "shm_queue"
if (diagnostics) diag$dispatch_mode <- dispatch_mode
}
# Set seed in workers if specified
if (!is.null(seed)) {
# When shards are auto-generated online, we don't know num_shards yet.
# Use total items as a stable substream spacing.
set_worker_seeds(pool, seed, if (shards_is_scalar_n) n_items else shards$num_shards)
}
# Export borrowed inputs to workers (once, not per shard)
export_borrow_to_workers(pool, borrow)
# Export output buffer references if any
if (length(out) > 0) {
export_out_to_workers(pool, out)
}
# Optional: auto table sink for tibble/data.frame return values.
if (!is.null(auto_table_sink)) {
export_auto_table_sink_to_workers(pool, auto_table_sink)
}
if (isTRUE(diagnostics)) {
reset_worker_diagnostics_(pool)
}
# shm_queue fast mode: chunk_size=1, fire-and-forget (no gathered results).
if (identical(dispatch_mode, "shm_queue")) {
if (!taskq_supported()) {
warning("dispatch_mode='shm_queue' not supported on this platform; falling back to rpc_chunked", call. = FALSE)
dispatch_mode <- "rpc_chunked"
} else {
if (isTRUE(auto_table)) {
stop("dispatch_opts$auto_table is not supported in dispatch_mode='shm_queue' (use rpc_chunked or explicit out=table_sink())", call. = FALSE)
}
if (as.integer(chunk_size) != 1L) {
stop("dispatch_mode='shm_queue' currently requires chunk_size=1", call. = FALSE)
}
if (length(out) == 0) {
warning("dispatch_mode='shm_queue' does not gather results; prefer using out= buffers/sinks.", call. = FALSE)
}
queue_backing <- dispatch_opts$queue_backing %||% "mmap"
if (shards_is_scalar_n) {
block_size <- dispatch_opts$block_size %||% autotune_block_size(
n = n_items,
workers = workers,
min_shards_per_worker = 4L,
max_shards_per_worker = 64L
)
block_size <- as.integer(block_size)
if (is.na(block_size) || block_size < 1L) stop("dispatch_opts$block_size must be >= 1", call. = FALSE)
shards <- shards_lazy(n_items, block_size = block_size)
dispatch_result <- dispatch_shards_shm_queue_(
n = n_items,
block_size = block_size,
shards = NULL,
fun = fun,
borrow = borrow,
out = out,
pool = pool,
max_retries = max_retries,
timeout = timeout,
queue_backing = queue_backing,
error_log = isTRUE(dispatch_opts$error_log %||% FALSE),
error_log_max_lines = dispatch_opts$error_log_max_lines %||% 100L
)
} else {
if (!inherits(shards, "shard_descriptor")) {
stop("dispatch_mode='shm_queue' requires shard_map(N, ...) or a shard_descriptor", call. = FALSE)
}
dispatch_result <- dispatch_shards_shm_queue_(
n = as.integer(shards$num_shards %||% length(shards$shards)),
block_size = 1L,
shards = shards,
fun = fun,
borrow = borrow,
out = out,
pool = pool,
max_retries = max_retries,
timeout = timeout,
queue_backing = queue_backing,
error_log = isTRUE(dispatch_opts$error_log %||% FALSE),
error_log_max_lines = dispatch_opts$error_log_max_lines %||% 100L
)
}
results <- dispatch_result$results
if (diagnostics) {
diag$end_time <- Sys.time()
diag$duration <- as.numeric(difftime(diag$end_time, diag$start_time, units = "secs"))
diag$health_checks <- dispatch_result$diagnostics$health_checks %||% list()
diag$shards_processed <- shards$num_shards
diag$chunks_dispatched <- shards$num_shards
diag$pool_stats <- dispatch_result$pool_stats
diag$view_stats <- dispatch_result$diagnostics$view_stats %||% NULL
diag$copy_stats <- dispatch_result$diagnostics$copy_stats %||% NULL
diag$table_stats <- dispatch_result$diagnostics$table_stats %||% NULL
diag$scratch_stats <- dispatch_result$diagnostics$scratch_stats %||% NULL
diag$error_logs <- dispatch_result$diagnostics$error_logs %||% list()
diag$shm_queue <- dispatch_result$diagnostics$taskq %||% NULL
}
return(structure(
list(
results = results,
failures = dispatch_result$failures,
shards = shards,
diagnostics = diag,
queue_status = dispatch_result$queue_status,
pool_stats = dispatch_result$pool_stats,
cow_policy = cow,
profile = profile
),
class = "shard_result"
))
}
}
# Create self-contained executor function for workers
chunk_executor <- make_chunk_executor(auto_table = auto_table)
# Optional: online shard sizing autotune for scalar-N sharding. This is opt-in
# by default for shard_map(N, ...) (low ceremony), and off for precomputed
# shard descriptors.
autotune_mode <- NULL
autotune_cfg <- NULL
if (is.list(autotune)) {
autotune_mode <- as.character(autotune$mode %||% "online")
autotune_cfg <- autotune
} else if (!is.null(autotune)) {
autotune_mode <- if (isTRUE(autotune)) "online" else as.character(autotune)
} else if (shards_is_scalar_n) {
autotune_mode <- "online"
} else {
autotune_mode <- "none"
}
if (!nzchar(autotune_mode)) autotune_mode <- "none"
if (shards_is_scalar_n && identical(autotune_mode, "online")) {
tuned <- shard_map_online_(
n = n_items,
fun = fun,
borrow = borrow,
out = out,
kernel_meta = kernel_meta,
chunk_executor = chunk_executor,
auto_table = auto_table,
auto_table_sink = auto_table_sink,
auto_table_materialize = dispatch_opts$auto_table_materialize %||% "auto",
auto_table_max_bytes = dispatch_opts$auto_table_max_bytes %||% (256 * 1024^2),
pool = pool,
workers = workers,
mem_cap = mem_cap,
chunk_size = chunk_size,
autotune_cfg = autotune_cfg,
profile = profile,
diagnostics = diagnostics,
diag = diag,
health_check_interval = health_check_interval,
max_retries = max_retries,
timeout = timeout,
scheduler_policy = scheduler_policy
)
shards <- tuned$shards
dispatch_result <- tuned$dispatch_result
if (diagnostics) diag <- tuned$diag
results <- tuned$results
} else {
# Create chunk batches if chunk_size > 1
if (shards_is_scalar_n) {
shards <- shards(n_items, workers = workers)
}
chunks <- create_shard_chunks(shards, chunk_size, fun, borrow, out, kernel_meta = kernel_meta)
# Dispatch chunks to workers with supervision
dispatch_result <- dispatch_chunks(
chunks = chunks,
fun = chunk_executor,
pool = pool,
health_check_interval = health_check_interval,
max_retries = max_retries,
timeout = timeout,
scheduler_policy = scheduler_policy,
store_results = !auto_table
)
if (auto_table) {
mat <- dispatch_opts$auto_table_materialize %||% "auto"
mx <- dispatch_opts$auto_table_max_bytes %||% (256 * 1024^2)
results <- table_finalize(auto_table_sink, materialize = mat, max_bytes = mx)
} else {
# Flatten results if chunk_size > 1
results <- if (chunk_size > 1L) {
unlist(dispatch_result$results, recursive = FALSE)
} else {
dispatch_result$results
}
}
# Collect diagnostics
if (diagnostics) {
diag$end_time <- Sys.time()
diag$duration <- as.numeric(difftime(diag$end_time, diag$start_time, units = "secs"))
diag$health_checks <- dispatch_result$diagnostics$health_checks
diag$shards_processed <- shards$num_shards
diag$chunks_dispatched <- length(chunks)
diag$pool_stats <- dispatch_result$pool_stats
diag$view_stats <- dispatch_result$diagnostics$view_stats %||% NULL
diag$view_hotspots <- dispatch_result$diagnostics$view_hotspots %||% list()
diag$copy_stats <- dispatch_result$diagnostics$copy_stats %||% NULL
diag$table_stats <- dispatch_result$diagnostics$table_stats %||% NULL
diag$scratch_stats <- dispatch_result$diagnostics$scratch_stats %||% NULL
diag$scheduler <- dispatch_result$diagnostics$scheduler %||% NULL
}
}
# Build result object
structure(
list(
results = results,
failures = dispatch_result$failures,
shards = shards,
diagnostics = diag,
queue_status = dispatch_result$queue_status,
pool_stats = dispatch_result$pool_stats,
cow_policy = cow,
profile = profile
),
class = "shard_result"
)
}
# Online shard sizing for scalar-N shard_map.
#
# This runs a few small phases to pick a reasonable block_size using observed
# wall time and worker RSS, then processes the remainder with the chosen size.
shard_map_online_ <- function(n,
fun,
borrow,
out,
kernel_meta,
chunk_executor,
auto_table = FALSE,
auto_table_sink = NULL,
auto_table_materialize = "auto",
auto_table_max_bytes = 256 * 1024^2,
pool,
workers,
mem_cap,
chunk_size,
autotune_cfg = NULL,
profile,
diagnostics,
diag,
health_check_interval,
max_retries,
timeout,
scheduler_policy) {
n <- as.integer(n)
if (is.na(n) || n < 1L) stop("n must be >= 1", call. = FALSE)
# Conservative defaults; keep user-facing ceremony low by being predictable.
cfg <- list(
max_rounds = 3L,
probe_shards_per_worker = 4L,
min_shard_time = 0.02, # seconds; below this, overhead dominates -> grow block
grow_factor = 2.0,
shrink_factor = 0.5,
rss_hi = 0.85,
rss_lo = 0.50
)
if (is.list(autotune_cfg)) {
for (nm in names(cfg)) {
if (!is.null(autotune_cfg[[nm]])) cfg[[nm]] <- autotune_cfg[[nm]]
}
}
# Initial block size from the existing heuristic.
bs <- autotune_block_size(
n = n,
workers = workers,
min_shards_per_worker = 4L,
max_shards_per_worker = 64L,
scratch_bytes_per_item = 0,
scratch_budget = 0
)
cursor <- 1L
shard_id <- 1L
all_shards <- list()
all_results <- list()
all_failures <- list()
# Aggregate dispatch diagnostics across phases.
agg_diag <- list(
health_checks = list(),
view_stats = list(created = 0L, materialized = 0L, materialized_bytes = 0, packed = 0L, packed_bytes = 0),
view_hotspots = list(),
copy_stats = list(borrow_exports = 0L, borrow_bytes = 0, buffer_writes = 0L, buffer_bytes = 0),
table_stats = list(writes = 0L, rows = 0L, bytes = 0),
scratch_stats = list(hits = 0L, misses = 0L, high_water = 0),
scheduler = list(throttle_events = 0L),
chunks_dispatched = 0L
)
hist <- list()
rounds <- 0L
# Use chunk_size=1 for probe phases so timing per shard is meaningful.
probe_chunk_size <- 1L
while (cursor <= n) {
# Probe in early rounds; afterward, use the run's requested chunk_size.
is_probe <- rounds < cfg$max_rounds
use_chunk_size <- if (is_probe) probe_chunk_size else chunk_size
# Probe only a small prefix; after tuning, take bigger bites.
target_shards <- if (is_probe) workers * cfg$probe_shards_per_worker else workers * 32L
phase_items <- min(n - cursor + 1L, as.integer(bs) * as.integer(max(target_shards, 1L)))
if (phase_items < 1L) phase_items <- 1L
phase_end <- min(cursor + phase_items - 1L, n)
phase_shards <- create_contiguous_shards_window_(
start = cursor,
end = phase_end,
block_size = bs,
start_id = shard_id
)
shard_id <- shard_id + length(phase_shards)
all_shards <- c(all_shards, phase_shards)
phase_desc <- structure(
list(
n = phase_end - cursor + 1L,
block_size = as.integer(bs),
strategy = "contiguous",
num_shards = length(phase_shards),
shards = phase_shards
),
class = "shard_descriptor"
)
chunks <- create_shard_chunks(phase_desc, use_chunk_size, fun, borrow, out, kernel_meta = kernel_meta)
rss_before <- tryCatch(mem_report(pool)$peak_rss, error = function(e) NA_real_)
t0 <- proc.time()[["elapsed"]]
dr <- dispatch_chunks(
chunks = chunks,
fun = chunk_executor,
pool = pool,
health_check_interval = health_check_interval,
max_retries = max_retries,
timeout = timeout,
scheduler_policy = scheduler_policy,
store_results = !isTRUE(auto_table)
)
t1 <- proc.time()[["elapsed"]]
rss_after <- tryCatch(mem_report(pool)$peak_rss, error = function(e) NA_real_)
if (!isTRUE(auto_table)) {
# Flatten phase results into per-shard results and append.
phase_res <- if (use_chunk_size > 1L) unlist(dr$results, recursive = FALSE) else dr$results
all_results <- c(all_results, phase_res)
}
if (length(dr$failures)) all_failures <- c(all_failures, dr$failures)
# Update aggregate diagnostics.
agg_diag$health_checks <- c(agg_diag$health_checks, dr$diagnostics$health_checks %||% list())
agg_diag$chunks_dispatched <- agg_diag$chunks_dispatched + length(chunks)
if (is.list(dr$diagnostics$view_stats)) {
for (k in names(agg_diag$view_stats)) agg_diag$view_stats[[k]] <- (agg_diag$view_stats[[k]] %||% 0) + (dr$diagnostics$view_stats[[k]] %||% 0)
}
if (is.list(dr$diagnostics$view_hotspots) && length(dr$diagnostics$view_hotspots) > 0) {
for (k in names(dr$diagnostics$view_hotspots)) {
cur <- agg_diag$view_hotspots[[k]] %||% list(bytes = 0, count = 0L)
cur$bytes <- (cur$bytes %||% 0) + (dr$diagnostics$view_hotspots[[k]]$bytes %||% 0)
cur$count <- as.integer((cur$count %||% 0L) + (dr$diagnostics$view_hotspots[[k]]$count %||% 0L))
agg_diag$view_hotspots[[k]] <- cur
}
}
if (is.list(dr$diagnostics$copy_stats)) {
for (k in names(agg_diag$copy_stats)) agg_diag$copy_stats[[k]] <- (agg_diag$copy_stats[[k]] %||% 0) + (dr$diagnostics$copy_stats[[k]] %||% 0)
}
if (is.list(dr$diagnostics$table_stats)) {
for (k in names(agg_diag$table_stats)) agg_diag$table_stats[[k]] <- (agg_diag$table_stats[[k]] %||% 0) + (dr$diagnostics$table_stats[[k]] %||% 0)
}
if (is.list(dr$diagnostics$scratch_stats)) {
agg_diag$scratch_stats$hits <- (agg_diag$scratch_stats$hits %||% 0L) + (dr$diagnostics$scratch_stats$hits %||% 0L)
agg_diag$scratch_stats$misses <- (agg_diag$scratch_stats$misses %||% 0L) + (dr$diagnostics$scratch_stats$misses %||% 0L)
agg_diag$scratch_stats$high_water <- max(as.double(agg_diag$scratch_stats$high_water %||% 0), as.double(dr$diagnostics$scratch_stats$high_water %||% 0))
}
if (is.list(dr$diagnostics$scheduler)) {
agg_diag$scheduler$throttle_events <- (agg_diag$scheduler$throttle_events %||% 0L) + as.integer(dr$diagnostics$scheduler$throttle_events %||% 0L)
}
# Phase metrics
elapsed <- as.double(t1 - t0)
items_done <- as.integer(phase_end - cursor + 1L)
shards_done <- length(phase_shards)
throughput <- if (elapsed > 0) as.double(items_done) / elapsed else NA_real_
shard_time <- if (shards_done > 0) elapsed / as.double(shards_done) else NA_real_
rss_peak <- suppressWarnings(max(c(rss_before, rss_after), na.rm = TRUE))
rss_frac <- if (is.finite(rss_peak) && is.finite(mem_cap) && mem_cap > 0) rss_peak / mem_cap else NA_real_
if (is_probe) rounds <- rounds + 1L
hist[[length(hist) + 1L]] <- list(
round = rounds,
start = cursor,
end = phase_end,
block_size = as.integer(bs),
chunk_size = as.integer(use_chunk_size),
elapsed_sec = elapsed,
items = items_done,
shards = shards_done,
throughput_items_per_sec = throughput,
shard_time_sec = shard_time,
rss_peak = rss_peak,
rss_fraction_of_mem_cap = rss_frac
)
cursor <- phase_end + 1L
# Update block_size for next probe phase using simple, safe heuristics.
if (is_probe && cursor <= n) {
bs_next <- bs
reason <- "keep"
# If we get close to mem_cap, shrink.
if (is.finite(rss_frac) && rss_frac >= cfg$rss_hi) {
bs_next <- max(as.integer(floor(as.double(bs) * cfg$shrink_factor)), 1L)
reason <- "shrink_rss"
} else if (is.finite(shard_time) && shard_time < cfg$min_shard_time) {
# If shards are too tiny (overhead dominates), grow.
bs_next <- as.integer(ceiling(as.double(bs) * cfg$grow_factor))
bs_next <- min(bs_next, n)
reason <- "grow_overhead"
} else if (is.finite(rss_frac) && rss_frac <= cfg$rss_lo && is.finite(shard_time) && shard_time < (cfg$min_shard_time * 0.5)) {
# Extra nudge: very low RSS and very small shard time.
bs_next <- as.integer(ceiling(as.double(bs) * cfg$grow_factor))
bs_next <- min(bs_next, n)
reason <- "grow_low_rss"
}
# Record decision.
hist[[length(hist)]]$decision <- reason
hist[[length(hist)]]$next_block_size <- as.integer(bs_next)
bs <- bs_next
}
}
full_desc <- structure(
list(
n = n,
block_size = NA_integer_,
strategy = "contiguous",
num_shards = length(all_shards),
shards = all_shards
),
class = "shard_descriptor"
)
# Produce a unified dispatch_result-like payload.
vh <- agg_diag$view_hotspots %||% list()
if (length(vh) > 0) {
ord <- order(vapply(vh, function(x) as.double(x$bytes %||% 0), numeric(1)), decreasing = TRUE)
vh <- vh[ord]
if (length(vh) > 20) vh <- vh[seq_len(20)]
}
dispatch_result <- structure(
list(
results = all_results,
failures = all_failures,
queue_status = list(
total = agg_diag$chunks_dispatched,
pending = 0L,
in_flight = 0L,
completed = agg_diag$chunks_dispatched - length(all_failures),
failed = length(all_failures),
total_retries = sum(vapply(all_failures, function(x) x$retry_count %||% 0L, integer(1)), na.rm = TRUE)
),
diagnostics = list(
health_checks = agg_diag$health_checks,
view_stats = agg_diag$view_stats,
view_hotspots = vh,
copy_stats = agg_diag$copy_stats,
table_stats = agg_diag$table_stats,
scratch_stats = agg_diag$scratch_stats,
scheduler = agg_diag$scheduler
),
pool_stats = pool_get()$stats
),
class = "shard_dispatch_result"
)
if (diagnostics) {
diag$end_time <- Sys.time()
diag$duration <- as.numeric(difftime(diag$end_time, diag$start_time, units = "secs"))
diag$health_checks <- agg_diag$health_checks
diag$shards_processed <- full_desc$num_shards
diag$chunks_dispatched <- agg_diag$chunks_dispatched
diag$pool_stats <- dispatch_result$pool_stats
diag$view_stats <- agg_diag$view_stats
diag$view_hotspots <- vh
diag$copy_stats <- agg_diag$copy_stats
diag$table_stats <- agg_diag$table_stats
diag$scratch_stats <- agg_diag$scratch_stats
diag$scheduler <- agg_diag$scheduler
diag$autotune <- list(mode = "online", history = hist)
}
list(
shards = full_desc,
results = if (isTRUE(auto_table)) {
if (is.null(auto_table_sink)) stop("auto_table enabled but auto_table_sink is NULL", call. = FALSE)
table_finalize(auto_table_sink, materialize = auto_table_materialize, max_bytes = auto_table_max_bytes)
} else {
all_results
},
dispatch_result = dispatch_result,
diag = diag
)
}
#' Get Profile Settings
#'
#' Returns settings based on execution profile.
#'
#' @param profile Profile name.
#' @param mem_cap User-specified memory cap.
#' @param recycle User-specified recycle setting.
#' @return List of settings.
#' @keywords internal
#' @noRd
get_profile_settings <- function(profile, mem_cap, recycle) {
settings <- list(
mem_cap = parse_bytes(mem_cap),
rss_drift_threshold = if (is.numeric(recycle)) recycle else 0.5,
health_check_interval = 10L
)
switch(profile,
"memory" = {
settings$rss_drift_threshold <- 0.25 # More aggressive recycling
settings$health_check_interval <- 5L
},
"speed" = {
settings$rss_drift_threshold <- 1.0 # Less recycling
settings$health_check_interval <- 50L
}
)
if (isFALSE(recycle)) {
settings$rss_drift_threshold <- Inf # Disable recycling
}
settings
}
#' Ensure Pool Exists
#'
#' Creates or validates worker pool.
#'
#' @param workers Number of workers.
#' @param mem_cap Memory cap in bytes.
#' @param rss_drift_threshold Drift threshold.
#' @param packages Packages to load.
#' @param init_expr Init expression.
#' @return Pool object.
#' @keywords internal
#' @noRd
ensure_pool <- function(workers, mem_cap, rss_drift_threshold, packages, init_expr) {
pool <- pool_get()
# Check if existing pool is suitable
if (!is.null(pool)) {
if (pool$n == workers) {
return(pool)
}
# Pool exists but wrong size - stop it
pool_stop()
}
# Create new pool
pool_create(
n = workers,
rss_limit = mem_cap,
rss_drift_threshold = rss_drift_threshold,
packages = packages,
init_expr = init_expr
)
pool_get()
}
#' Validate Borrowed Inputs
#'
#' Validates and prepares borrowed inputs.
#'
#' @param borrow List of inputs.
#' @param cow COW policy.
#' @return Validated borrow list.
#' @keywords internal
#' @noRd
validate_borrow <- function(borrow, cow) {
if (length(borrow) == 0) return(borrow)
if (!is.list(borrow) || is.null(names(borrow))) {
stop("borrow must be a named list", call. = FALSE)
}
if (any(names(borrow) == "")) {
stop("All borrowed inputs must be named", call. = FALSE)
}
auto_shared <- character(0)
# Auto-share large atomic inputs once in the main process so PSOCK workers
# can receive a small descriptor (via ALTREP serialization) instead of full
# data copies.
for (name in names(borrow)) {
x <- borrow[[name]]
if (is.atomic(x) && !is.null(x) &&
typeof(x) %in% c("double", "integer", "logical", "raw") &&
!is_shared_vector(x)) {
# Build with cow='allow' so we can attach attributes, then lock down to
# the requested policy.
shared <- as_shared(x, readonly = TRUE, backing = "auto", cow = "allow")
# Preserve non-class attributes (dim, dimnames, names, tsp, etc).
attrs <- attributes(x)
x_class <- attr(x, "class")
attrs$class <- NULL
if (length(attrs)) {
for (nm in names(attrs)) {
attr(shared, nm) <- attrs[[nm]]
}
}
class(shared) <- unique(c("shard_shared_vector", x_class))
attr(shared, "shard_cow") <- cow
borrow[[name]] <- shared
x <- shared
auto_shared <- c(auto_shared, name)
}
# Best-effort tag for downstream diagnostics.
if (!is_shared_vector(x)) {
attr(borrow[[name]], "shard_cow") <- cow
} else {
existing <- attr(x, "shard_cow", exact = TRUE)
if (!is.null(existing) && is.character(existing) &&
length(existing) == 1L && !identical(existing, cow)) {
warning("Borrowed input '", name, "' has shard_cow='", existing,
"' but shard_map(cow='", cow, "') was requested. Using '",
existing, "'.", call. = FALSE)
}
}
}
attr(borrow, "auto_shared") <- auto_shared
borrow
}
#' Validate Output Buffers
#'
#' Validates output buffer specifications.
#'
#' @param out List of output buffers.
#' @return Validated out list.
#' @keywords internal
#' @noRd
validate_out <- function(out) {
if (length(out) == 0) return(out)
if (!is.list(out) || is.null(names(out))) {
stop("out must be a named list", call. = FALSE)
}
if (any(names(out) == "")) {
stop("All output buffers must be named", call. = FALSE)
}
bad <- vapply(out, function(x) {
!(inherits(x, "shard_buffer") ||
inherits(x, "shard_table_buffer") ||
inherits(x, "shard_table_sink"))
}, logical(1))
if (any(bad)) {
stop("All outputs must be shard_buffer, shard_table_buffer, or shard_table_sink objects.",
call. = FALSE)
}
out
}
#' Set Worker Seeds
#'
#' Sets reproducible RNG seeds in workers.
#'
#' @param pool Worker pool.
#' @param seed Base seed.
#' @param num_shards Number of shards for substream calculation.
#' @keywords internal
#' @noRd
set_worker_seeds <- function(pool, seed, num_shards) {
for (i in seq_len(pool$n)) {
worker_seed <- seed + (i - 1L) * num_shards
tryCatch({
parallel::clusterCall(pool$workers[[i]]$cluster, function(s) {
set.seed(s)
}, worker_seed)
}, error = function(e) NULL)
}
}
#' Export Borrowed Inputs to Workers
#'
#' Exports borrowed data to all workers (once, reused across shards).
#'
#' @param pool Worker pool.
#' @param borrow List of borrowed inputs.
#' @keywords internal
#' @noRd
export_borrow_to_workers <- function(pool, borrow) {
if (length(borrow) == 0) return(invisible(NULL))
# Create an environment with the borrowed data
export_env <- new.env(parent = emptyenv())
export_env$.shard_borrow <- borrow
# Export to all workers
for (i in seq_len(pool$n)) {
w <- pool$workers[[i]]
if (!is.null(w) && worker_is_alive(w)) {
tryCatch({
parallel::clusterExport(w$cluster, ".shard_borrow", envir = export_env)
}, error = function(e) {
warning("Failed to export borrow to worker ", i, ": ", conditionMessage(e))
})
}
}
invisible(NULL)
}
#' Export Output Buffers to Workers
#'
#' Exports output buffer references to workers.
#'
#' @param pool Worker pool.
#' @param out List of output buffers.
#' @keywords internal
#' @noRd
export_out_to_workers <- function(pool, out) {
if (length(out) == 0) return(invisible(NULL))
# Export reopenable descriptors rather than shard_buffer objects. The raw
# segment externalptr does not survive PSOCK serialization.
describe_one <- function(obj) {
if (inherits(obj, "shard_buffer")) {
info <- buffer_info(obj)
return(list(
kind = "buffer",
path = info$path,
backing = info$backing,
type = info$type,
dim = info$dim
))
}
if (inherits(obj, "shard_table_buffer")) {
cols <- lapply(obj$columns, function(buf) {
info <- buffer_info(buf)
list(path = info$path, backing = info$backing, type = info$type, dim = info$dim)
})
return(list(
kind = "table_buffer",
schema = obj$schema,
nrow = obj$nrow,
backing = obj$backing,
columns = cols
))
}
if (inherits(obj, "shard_table_sink")) {
return(list(
kind = "table_sink",
schema = obj$schema,
mode = obj$mode,
path = obj$path,
format = obj$format
))
}
stop("Unsupported out object type", call. = FALSE)
}
out_desc <- lapply(out, describe_one)
export_env <- new.env(parent = emptyenv())
export_env$.shard_out <- out_desc
for (i in seq_len(pool$n)) {
w <- pool$workers[[i]]
if (!is.null(w) && worker_is_alive(w)) {
tryCatch({
parallel::clusterExport(w$cluster, ".shard_out", envir = export_env)
}, error = function(e) {
warning("Failed to export out to worker ", i, ": ", conditionMessage(e))
})
}
}
invisible(NULL)
}
export_auto_table_sink_to_workers <- function(pool, sink) {
if (is.null(sink)) return(invisible(NULL))
if (!inherits(sink, "shard_table_sink")) {
stop("sink must be a shard_table_sink", call. = FALSE)
}
export_env <- new.env(parent = emptyenv())
export_env$.shard_auto_table_sink <- list(
schema = sink$schema,
mode = sink$mode,
path = sink$path,
format = sink$format
)
for (i in seq_len(pool$n)) {
w <- pool$workers[[i]]
if (!is.null(w) && worker_is_alive(w)) {
tryCatch({
parallel::clusterExport(w$cluster, ".shard_auto_table_sink", envir = export_env)
}, error = function(e) {
warning("Failed to export auto table sink to worker ", i, ": ", conditionMessage(e))
})
}
}
invisible(NULL)
}
reset_worker_diagnostics_ <- function(pool) {
# Best-effort: reset per-process counters so run telemetry is clean and
# attribution (e.g. view materialization hotspots) isn't polluted by previous runs.
for (i in seq_len(pool$n)) {
w <- pool$workers[[i]]
if (!is.null(w) && worker_is_alive(w)) {
tryCatch({
parallel::clusterCall(w$cluster, function() {
f1 <- tryCatch(get("view_reset_diagnostics", asNamespace("shard")), error = function(e) NULL)
f2 <- tryCatch(get("buffer_reset_diagnostics", asNamespace("shard")), error = function(e) NULL)
f3 <- tryCatch(get("table_reset_diagnostics", asNamespace("shard")), error = function(e) NULL)
f4 <- tryCatch(get("scratch_reset_diagnostics", asNamespace("shard")), error = function(e) NULL)
if (is.function(f1)) tryCatch(f1(), error = function(e) NULL)
if (is.function(f2)) tryCatch(f2(), error = function(e) NULL)
if (is.function(f3)) tryCatch(f3(), error = function(e) NULL)
if (is.function(f4)) tryCatch(f4(), error = function(e) NULL)
if (exists(".shard_view_hotspot_snapshot", envir = .shard_worker_env, inherits = FALSE)) {
rm(".shard_view_hotspot_snapshot", envir = .shard_worker_env)
}
NULL
})
}, error = function(e) NULL)
}
}
invisible(NULL)
}
#' Create Shard Chunks
#'
#' Groups shards into chunks for dispatch. Each chunk contains a self-contained
#' executor function that can run in a worker without needing package functions.
#'
#' @param shards Shard descriptor.
#' @param chunk_size Shards per chunk.
#' @param fun User function.
#' @param borrow Borrowed inputs.
#' @param out Output buffers.
#' @return List of chunk descriptors.
#' @keywords internal
#' @noRd
create_shard_chunks <- function(shards, chunk_size, fun, borrow, out, kernel_meta = NULL) {
chunk_size <- max(as.integer(chunk_size), 1L)
num_chunks <- ceiling(shards$num_shards / chunk_size)
chunks <- vector("list", num_chunks)
borrow_names <- names(borrow)
out_names <- names(out)
classify_bytes <- function(bytes) {
bytes <- as.double(bytes)
if (!is.finite(bytes) || is.na(bytes)) return("tiny")
if (bytes >= 64 * 1024^2) return("huge")
if (bytes >= 8 * 1024^2) return("medium")
"tiny"
}
for (i in seq_len(num_chunks)) {
start_idx <- (i - 1L) * chunk_size + 1L
end_idx <- min(i * chunk_size, shards$num_shards)
chunk_shards <- shards$shards[start_idx:end_idx]
# Optional footprint hint for memory-aware scheduling.
fp_class <- NULL
fp_bytes <- NULL
if (!is.null(kernel_meta) && !is.null(kernel_meta$footprint)) {
fp <- kernel_meta$footprint
if (is.numeric(fp) && length(fp) == 1L) {
fp_bytes <- as.double(fp)
fp_class <- classify_bytes(fp_bytes)
} else if (is.function(fp)) {
vals <- lapply(chunk_shards, function(s) {
tryCatch(fp(s), error = function(e) NULL)
})
# Accept either numeric bytes or list(class=..., bytes=...).
bytes <- vapply(vals, function(v) {
if (is.null(v)) return(NA_real_)
if (is.numeric(v)) return(as.double(v[[1]]))
if (is.list(v) && !is.null(v$bytes)) return(as.double(v$bytes))
NA_real_
}, numeric(1))
fp_bytes <- suppressWarnings(max(bytes, na.rm = TRUE))
if (!is.finite(fp_bytes)) fp_bytes <- NULL
cls <- vapply(vals, function(v) {
if (is.list(v) && !is.null(v$class)) as.character(v$class) else NA_character_
}, character(1))
cls <- cls[!is.na(cls) & nzchar(cls)]
fp_class <- if (length(cls) > 0) cls[[1]] else if (!is.null(fp_bytes)) classify_bytes(fp_bytes) else NULL
}
}
chunks[[i]] <- list(
id = i,
shard_ids = start_idx:end_idx,
shards = chunk_shards,
fun = fun,
borrow_names = borrow_names,
out_names = out_names,
footprint_class = fp_class,
footprint_bytes = fp_bytes
)
}
chunks
}
#' Create Chunk Executor Function
#'
#' Creates a self-contained function that can execute a chunk in a worker.
#' This function is passed to dispatch_chunks and runs entirely within
#' the worker process.
#'
#' @return A function that executes chunks.
#' @keywords internal
#' @noRd
make_chunk_executor <- function(auto_table = FALSE) {
out_desc_key_ <- function(d) {
# A stable identifier for deciding whether a cached out handle can be reused.
kind <- d$kind %||% "buffer"
if (identical(kind, "buffer")) {
return(paste0("buffer|", d$path, "|", d$backing, "|", d$type, "|", paste(d$dim, collapse = "x")))
}
if (identical(kind, "table_buffer")) {
col_keys <- vapply(d$columns, function(cd) {
paste0(cd$path, "|", cd$backing, "|", cd$type, "|", paste(cd$dim, collapse = "x"))
}, character(1))
return(paste0("table_buffer|", paste(names(col_keys), col_keys, sep = "=", collapse = ";")))
}
if (identical(kind, "table_sink")) {
return(paste0("table_sink|", d$path, "|", d$format, "|", d$mode))
}
paste0("unknown|", kind)
}
open_out_one_ <- function(d) {
kind <- d$kind %||% "buffer"
if (identical(kind, "buffer")) {
buf <- buffer_open(
path = d$path,
type = d$type,
dim = d$dim,
backing = d$backing,
readonly = FALSE
)
return(buf)
}
if (identical(kind, "table_buffer")) {
cols <- list()
for (cn in names(d$columns)) {
cd <- d$columns[[cn]]
cols[[cn]] <- buffer_open(
path = cd$path,
type = cd$type,
dim = cd$dim,
backing = cd$backing,
readonly = FALSE
)
}
tb <- structure(
list(schema = d$schema, nrow = as.integer(d$nrow), backing = d$backing, columns = cols),
class = "shard_table_buffer"
)
return(tb)
}
if (identical(kind, "table_sink")) {
ts <- structure(
list(schema = d$schema, mode = d$mode, path = d$path, format = d$format),
class = "shard_table_sink"
)
return(ts)
}
stop("Unsupported out descriptor kind: ", kind, call. = FALSE)
}
close_out_one_ <- function(obj) {
if (inherits(obj, "shard_buffer")) {
tryCatch(buffer_close(obj), error = function(e) NULL)
return(invisible(NULL))
}
if (inherits(obj, "shard_table_buffer")) {
if (!is.null(obj$columns) && is.list(obj$columns)) {
for (col in obj$columns) {
if (inherits(col, "shard_buffer")) {
tryCatch(buffer_close(col), error = function(e) NULL)
}
}
}
return(invisible(NULL))
}
invisible(NULL)
}
# This function runs inside workers
function(chunk) {
# Get borrowed inputs and outputs from worker environment
borrow <- if (exists(".shard_borrow", envir = globalenv())) {
get(".shard_borrow", envir = globalenv())
} else {
list()
}
out_desc <- if (exists(".shard_out", envir = globalenv())) {
get(".shard_out", envir = globalenv())
} else {
list()
}
# Lazily open output buffers once per worker process and cache them.
out <- list()
if (length(out_desc) > 0) {
if (!exists(".shard_out_opened", envir = .shard_worker_env, inherits = FALSE)) {
assign(".shard_out_opened", new.env(parent = emptyenv()), envir = .shard_worker_env)
}
opened <- get(".shard_out_opened", envir = .shard_worker_env)
for (nm in names(out_desc)) {
d <- out_desc[[nm]]
want_key <- out_desc_key_(d)
entry <- if (exists(nm, envir = opened, inherits = FALSE)) opened[[nm]] else NULL
cur_key <- NULL
cur_obj <- NULL
if (!is.null(entry) && is.list(entry) && !is.null(entry$key) && !is.null(entry$obj)) {
cur_key <- entry$key
cur_obj <- entry$obj
} else if (!is.null(entry)) {
cur_obj <- entry
}
if (is.null(cur_obj) || !identical(cur_key, want_key)) {
if (!is.null(cur_obj)) close_out_one_(cur_obj)
new_obj <- open_out_one_(d)
opened[[nm]] <- list(key = want_key, obj = new_obj)
}
out[[nm]] <- opened[[nm]]$obj
}
}
# Get the user function from the chunk
fun <- chunk$fun
borrow_names <- chunk$borrow_names
out_names <- chunk$out_names
if (isTRUE(auto_table)) {
# Low-ceremony table outputs: if the user function returns a data.frame,
# write it as a row-group partition (one file per shard id). This avoids
# building a giant list + bind_rows() on the master.
if (!exists(".shard_auto_table_sink", envir = globalenv(), inherits = FALSE)) {
stop("auto_table is enabled but no auto table sink is available in the worker", call. = FALSE)
}
d <- get(".shard_auto_table_sink", envir = globalenv(), inherits = FALSE)
sink <- structure(
list(schema = d$schema, mode = d$mode, path = d$path, format = d$format),
class = "shard_table_sink"
)
for (shard in chunk$shards) {
args <- list(shard)
for (name in borrow_names) args[[name]] <- borrow[[name]]
for (name in out_names) args[[name]] <- out[[name]]
val <- do.call(fun, args, quote = TRUE)
if (is.null(val)) next
if (!is.data.frame(val)) {
stop("auto_table requires fun() to return a data.frame/tibble (or NULL) for all shards", call. = FALSE)
}
sid <- as.integer(shard$id %||% NA_integer_)
if (is.na(sid) || sid < 1L) stop("Invalid shard id for table_write()", call. = FALSE)
table_write(sink, sid, val)
}
return(NULL)
}
# Execute for each shard in the chunk (return values gathered to master).
lapply(chunk$shards, function(shard) {
args <- list(shard)
for (name in borrow_names) args[[name]] <- borrow[[name]]
for (name in out_names) args[[name]] <- out[[name]]
# `do.call()` has a sharp edge: if an argument value is a language object,
# it will be spliced into the call and evaluated (surprising for "data"
# being passed through borrow/out). Using quote=TRUE ensures language
# objects are passed as values, not executed as code.
do.call(fun, args, quote = TRUE)
})
}
}
#' Print a shard_result Object
#'
#' @param x A \code{shard_result} object.
#' @param ... Further arguments (ignored).
#' @return The input \code{x}, invisibly.
#' @export
#' @examples
#' \donttest{
#' result <- shard_map(4L, function(shard) shard$idx, workers = 2)
#' pool_stop()
#' print(result)
#' }
print.shard_result <- function(x, ...) {
cat("shard_map result\n")
if (!is.null(x$diagnostics)) {
cat(" Duration:", sprintf("%.2f seconds", x$diagnostics$duration), "\n")
cat(" Shards:", x$diagnostics$shards_processed, "\n")
cat(" Chunks:", x$diagnostics$chunks_dispatched, "\n")
}
status <- x$queue_status
if (!is.null(status)) {
cat(" Completed:", status$completed,
if (status$failed > 0) paste0(" (", status$failed, " failed)") else "", "\n")
if (status$total_retries > 0) {
cat(" Retries:", status$total_retries, "\n")
}
}
if (!is.null(x$pool_stats)) {
cat(" Worker recycles:", x$pool_stats$total_recycles, "\n")
cat(" Worker deaths:", x$pool_stats$total_deaths, "\n")
}
cat(" Profile:", x$profile, "\n")
cat(" COW policy:", x$cow_policy, "\n")
if (length(x$failures) > 0) {
cat(" Failed shards:", length(x$failures), "\n")
}
invisible(x)
}
#' Extract Results from shard_map
#'
#' @param x A shard_result object.
#' @param flatten Logical. Flatten nested results?
#' @return List or vector of results.
#' @export
#' @examples
#' \donttest{
#' result <- shard_map(4L, function(shard) shard$idx[[1L]], workers = 2)
#' pool_stop()
#' results(result)
#' }
results <- function(x, flatten = TRUE) {
if (!inherits(x, "shard_result")) {
stop("x must be a shard_result object", call. = FALSE)
}
res <- x$results
if (inherits(res, c("shard_row_groups", "shard_dataset", "shard_table_handle"))) {
return(res)
}
if (is.data.frame(res)) {
return(res)
}
if (inherits(res, "shard_results_placeholder")) {
# Avoid unlist() on a placeholder (would allocate enormous objects).
return(res)
}
if (!is.list(res)) {
# Unusual but allowed (e.g., auto-materialized scalar results).
return(res)
}
if (flatten && length(res) > 0) {
# Try to simplify
tryCatch(
unlist(res, recursive = FALSE),
error = function(e) res
)
} else {
res
}
}
#' Check if shard_map Succeeded
#'
#' @param x A shard_result object.
#' @return Logical. TRUE if no failures.
#' @export
#' @examples
#' \donttest{
#' result <- shard_map(4L, function(shard) shard$idx[[1L]], workers = 2)
#' pool_stop()
#' succeeded(result)
#' }
succeeded <- function(x) {
if (inherits(x, "shard_result") || inherits(x, "shard_reduce_result")) {
return(length(x$failures) == 0)
}
stop("x must be a shard_result or shard_reduce_result object", call. = FALSE)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.