Nothing
#' @title Chunk Queue Management
#' @description Queue management for dispatching chunks to workers with requeue support.
#' @name queue
NULL
#' Create a Chunk Queue
#'
#' Creates a queue for managing chunk dispatch with support for requeuing
#' failed chunks when workers die or are recycled.
#'
#' @param chunks List of chunk descriptors. Each chunk should have an `id` field.
#' @return A `shard_queue` object with queue management methods.
#' @keywords internal
#' @noRd
queue_create <- function(chunks) {
# Ensure each chunk has an ID
for (i in seq_along(chunks)) {
if (is.null(chunks[[i]]$id)) {
chunks[[i]]$id <- i
}
}
env <- new.env(parent = emptyenv())
env$pending <- chunks
env$in_flight <- list()
env$completed <- list()
env$failed <- list()
env$assignment <- list() # chunk_id -> worker_id mapping
env$order <- vapply(chunks, function(x) as.character(x$id), character(1))
structure(
list(
env = env,
total = length(chunks)
),
class = "shard_queue"
)
}
#' Get Next Chunk from Queue
#'
#' @param queue A `shard_queue` object.
#' @param worker_id Worker requesting the chunk.
#' @return A chunk descriptor or NULL if queue is empty.
#' @keywords internal
#' @noRd
queue_next <- function(queue, worker_id) {
env <- queue$env
if (length(env$pending) == 0) {
return(NULL)
}
chunk <- env$pending[[1]]
env$pending <- env$pending[-1]
# Track assignment
chunk_id <- as.character(chunk$id)
env$in_flight[[chunk_id]] <- chunk
env$assignment[[chunk_id]] <- worker_id
chunk
}
queue_next_where <- function(queue, worker_id, predicate = NULL) {
env <- queue$env
if (length(env$pending) == 0) return(NULL)
if (is.null(predicate)) return(queue_next(queue, worker_id))
if (!is.function(predicate)) stop("predicate must be a function or NULL", call. = FALSE)
idx <- NA_integer_
for (i in seq_along(env$pending)) {
ch <- env$pending[[i]]
ok <- FALSE
ok <- tryCatch(isTRUE(predicate(ch)), error = function(e) FALSE)
if (ok) {
idx <- i
break
}
}
if (is.na(idx)) return(NULL)
chunk <- env$pending[[idx]]
env$pending <- env$pending[-idx]
chunk_id <- as.character(chunk$id)
env$in_flight[[chunk_id]] <- chunk
env$assignment[[chunk_id]] <- worker_id
chunk
}
#' Mark Chunk as Completed
#'
#' @param queue A `shard_queue` object.
#' @param chunk_id Chunk identifier.
#' @param result The result from processing.
#' @return NULL (invisibly).
#' @keywords internal
#' @noRd
queue_complete <- function(queue, chunk_id, result = NULL, retain = TRUE) {
env <- queue$env
chunk_id <- as.character(chunk_id)
if (!is.null(env$in_flight[[chunk_id]])) {
chunk <- env$in_flight[[chunk_id]]
if (isTRUE(retain)) {
chunk$result <- result
chunk$completed_at <- Sys.time()
env$completed[[chunk_id]] <- chunk
} else {
# Keep only minimal completion metadata to avoid retaining large shard lists
# when callers are doing streaming reductions.
env$completed[[chunk_id]] <- list(
id = chunk$id,
completed_at = Sys.time(),
retry_count = chunk$retry_count %||% 0L,
result = result
)
}
env$in_flight[[chunk_id]] <- NULL
env$assignment[[chunk_id]] <- NULL
}
invisible(NULL)
}
#' Mark Chunk as Failed
#'
#' @param queue A `shard_queue` object.
#' @param chunk_id Chunk identifier.
#' @param error The error that occurred.
#' @param requeue Logical. Whether to requeue for retry.
#' @return NULL (invisibly).
#' @keywords internal
#' @noRd
queue_fail <- function(queue, chunk_id, error = NULL, requeue = TRUE) {
env <- queue$env
chunk_id <- as.character(chunk_id)
chunk <- env$in_flight[[chunk_id]]
if (is.null(chunk)) {
return(invisible(NULL))
}
env$in_flight[[chunk_id]] <- NULL
env$assignment[[chunk_id]] <- NULL
# Track retry count
chunk$retry_count <- (chunk$retry_count %||% 0L) + 1L
chunk$last_error <- error
if (requeue) {
# Put back in pending queue for retry
env$pending <- c(env$pending, list(chunk))
} else {
# Mark as permanently failed
chunk$failed_at <- Sys.time()
env$failed[[chunk_id]] <- chunk
}
invisible(NULL)
}
#' Requeue All Chunks for a Worker
#'
#' When a worker dies or is recycled, requeue all its in-flight chunks.
#'
#' @param queue A `shard_queue` object.
#' @param worker_id Worker identifier.
#' @return Integer. Number of chunks requeued.
#' @keywords internal
#' @noRd
queue_requeue_worker <- function(queue, worker_id) {
env <- queue$env
requeued <- 0L
chunk_ids <- names(env$assignment)
for (chunk_id in chunk_ids) {
if (identical(env$assignment[[chunk_id]], worker_id)) {
queue_fail(queue, chunk_id, error = "worker_recycled", requeue = TRUE)
requeued <- requeued + 1L
}
}
requeued
}
#' Check if Queue is Done
#'
#' @param queue A `shard_queue` object.
#' @return Logical. TRUE if all chunks are completed or failed.
#' @keywords internal
#' @noRd
queue_is_done <- function(queue) {
env <- queue$env
length(env$pending) == 0 && length(env$in_flight) == 0
}
#' Check if Queue Has Work
#'
#' @param queue A `shard_queue` object.
#' @return Logical. TRUE if there are pending chunks.
#' @keywords internal
#' @noRd
queue_has_pending <- function(queue) {
length(queue$env$pending) > 0
}
#' Get Queue Status
#'
#' @param queue A `shard_queue` object.
#' @return A list with queue statistics.
#' @keywords internal
#' @noRd
queue_status <- function(queue) {
env <- queue$env
# Count retries
total_retries <- sum(vapply(
c(env$completed, env$pending, env$in_flight, env$failed),
function(x) x$retry_count %||% 0L,
integer(1)
))
list(
total = queue$total,
pending = length(env$pending),
in_flight = length(env$in_flight),
completed = length(env$completed),
failed = length(env$failed),
total_retries = total_retries
)
}
#' Get Queue Results
#'
#' @param queue A `shard_queue` object.
#' @return List of results from completed chunks.
#' @keywords internal
#' @noRd
queue_results <- function(queue) {
env <- queue$env
completed <- env$completed
ids <- names(completed)
if (length(ids) == 0) return(list())
# Prefer numeric id ordering for predictable, lapply/sapply-like behavior.
num_ids <- suppressWarnings(as.integer(ids))
if (!anyNA(num_ids)) {
completed <- completed[order(num_ids)]
} else if (!is.null(env$order)) {
ord_ids <- intersect(env$order, ids)
completed <- completed[ord_ids]
}
lapply(completed, function(x) x$result)
}
#' Get Failed Chunks
#'
#' @param queue A `shard_queue` object.
#' @return List of failed chunk descriptors.
#' @keywords internal
#' @noRd
queue_failures <- function(queue) {
queue$env$failed
}
#' @param x A \code{shard_queue} object.
#' @param ... Further arguments (ignored).
#' @return The input \code{x}, invisibly.
#' @export
#' @noRd
print.shard_queue <- function(x, ...) {
status <- queue_status(x)
cat("shard chunk queue\n")
cat(" Total chunks:", status$total, "\n")
cat(" Pending:", status$pending, "\n")
cat(" In flight:", status$in_flight, "\n")
cat(" Completed:", status$completed, "\n")
cat(" Failed:", status$failed, "\n")
cat(" Total retries:", status$total_retries, "\n")
invisible(x)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.