R/queue.R

Defines functions print.shard_queue queue_failures queue_results queue_status queue_has_pending queue_is_done queue_requeue_worker queue_fail queue_complete queue_next_where queue_next queue_create

#' @title Chunk Queue Management
#' @description Queue management for dispatching chunks to workers with requeue support.
#' @name queue
NULL

#' Create a Chunk Queue
#'
#' Creates a queue for managing chunk dispatch with support for requeuing
#' failed chunks when workers die or are recycled.
#'
#' @param chunks List of chunk descriptors. Each chunk should have an `id` field.
#' @return A `shard_queue` object with queue management methods.
#' @keywords internal
#' @noRd
queue_create <- function(chunks) {
  # Ensure each chunk has an ID
  for (i in seq_along(chunks)) {
    if (is.null(chunks[[i]]$id)) {
      chunks[[i]]$id <- i
    }
  }

  env <- new.env(parent = emptyenv())
  env$pending <- chunks
  env$in_flight <- list()
  env$completed <- list()
  env$failed <- list()
  env$assignment <- list()  # chunk_id -> worker_id mapping
  env$order <- vapply(chunks, function(x) as.character(x$id), character(1))

  structure(
    list(
      env = env,
      total = length(chunks)
    ),
    class = "shard_queue"
  )
}

#' Get Next Chunk from Queue
#'
#' @param queue A `shard_queue` object.
#' @param worker_id Worker requesting the chunk.
#' @return A chunk descriptor or NULL if queue is empty.
#' @keywords internal
#' @noRd
queue_next <- function(queue, worker_id) {
  env <- queue$env

  if (length(env$pending) == 0) {
    return(NULL)
  }

  chunk <- env$pending[[1]]
  env$pending <- env$pending[-1]

  # Track assignment
  chunk_id <- as.character(chunk$id)
  env$in_flight[[chunk_id]] <- chunk
  env$assignment[[chunk_id]] <- worker_id

  chunk
}

queue_next_where <- function(queue, worker_id, predicate = NULL) {
  env <- queue$env
  if (length(env$pending) == 0) return(NULL)
  if (is.null(predicate)) return(queue_next(queue, worker_id))
  if (!is.function(predicate)) stop("predicate must be a function or NULL", call. = FALSE)

  idx <- NA_integer_
  for (i in seq_along(env$pending)) {
    ch <- env$pending[[i]]
    ok <- FALSE
    ok <- tryCatch(isTRUE(predicate(ch)), error = function(e) FALSE)
    if (ok) {
      idx <- i
      break
    }
  }
  if (is.na(idx)) return(NULL)

  chunk <- env$pending[[idx]]
  env$pending <- env$pending[-idx]

  chunk_id <- as.character(chunk$id)
  env$in_flight[[chunk_id]] <- chunk
  env$assignment[[chunk_id]] <- worker_id
  chunk
}

#' Mark Chunk as Completed
#'
#' @param queue A `shard_queue` object.
#' @param chunk_id Chunk identifier.
#' @param result The result from processing.
#' @return NULL (invisibly).
#' @keywords internal
#' @noRd
queue_complete <- function(queue, chunk_id, result = NULL, retain = TRUE) {
  env <- queue$env
  chunk_id <- as.character(chunk_id)

  if (!is.null(env$in_flight[[chunk_id]])) {
    chunk <- env$in_flight[[chunk_id]]
    if (isTRUE(retain)) {
      chunk$result <- result
      chunk$completed_at <- Sys.time()
      env$completed[[chunk_id]] <- chunk
    } else {
      # Keep only minimal completion metadata to avoid retaining large shard lists
      # when callers are doing streaming reductions.
      env$completed[[chunk_id]] <- list(
        id = chunk$id,
        completed_at = Sys.time(),
        retry_count = chunk$retry_count %||% 0L,
        result = result
      )
    }
    env$in_flight[[chunk_id]] <- NULL
    env$assignment[[chunk_id]] <- NULL
  }

  invisible(NULL)
}

#' Mark Chunk as Failed
#'
#' @param queue A `shard_queue` object.
#' @param chunk_id Chunk identifier.
#' @param error The error that occurred.
#' @param requeue Logical. Whether to requeue for retry.
#' @return NULL (invisibly).
#' @keywords internal
#' @noRd
queue_fail <- function(queue, chunk_id, error = NULL, requeue = TRUE) {
  env <- queue$env
  chunk_id <- as.character(chunk_id)

  chunk <- env$in_flight[[chunk_id]]
  if (is.null(chunk)) {
    return(invisible(NULL))
  }

  env$in_flight[[chunk_id]] <- NULL
  env$assignment[[chunk_id]] <- NULL

  # Track retry count
  chunk$retry_count <- (chunk$retry_count %||% 0L) + 1L
  chunk$last_error <- error

  if (requeue) {
    # Put back in pending queue for retry
    env$pending <- c(env$pending, list(chunk))
  } else {
    # Mark as permanently failed
    chunk$failed_at <- Sys.time()
    env$failed[[chunk_id]] <- chunk
  }

  invisible(NULL)
}

#' Requeue All Chunks for a Worker
#'
#' When a worker dies or is recycled, requeue all its in-flight chunks.
#'
#' @param queue A `shard_queue` object.
#' @param worker_id Worker identifier.
#' @return Integer. Number of chunks requeued.
#' @keywords internal
#' @noRd
queue_requeue_worker <- function(queue, worker_id) {
  env <- queue$env
  requeued <- 0L

  chunk_ids <- names(env$assignment)
  for (chunk_id in chunk_ids) {
    if (identical(env$assignment[[chunk_id]], worker_id)) {
      queue_fail(queue, chunk_id, error = "worker_recycled", requeue = TRUE)
      requeued <- requeued + 1L
    }
  }

  requeued
}

#' Check if Queue is Done
#'
#' @param queue A `shard_queue` object.
#' @return Logical. TRUE if all chunks are completed or failed.
#' @keywords internal
#' @noRd
queue_is_done <- function(queue) {
  env <- queue$env
  length(env$pending) == 0 && length(env$in_flight) == 0
}

#' Check if Queue Has Work
#'
#' @param queue A `shard_queue` object.
#' @return Logical. TRUE if there are pending chunks.
#' @keywords internal
#' @noRd
queue_has_pending <- function(queue) {
  length(queue$env$pending) > 0
}

#' Get Queue Status
#'
#' @param queue A `shard_queue` object.
#' @return A list with queue statistics.
#' @keywords internal
#' @noRd
queue_status <- function(queue) {
  env <- queue$env

  # Count retries
  total_retries <- sum(vapply(
    c(env$completed, env$pending, env$in_flight, env$failed),
    function(x) x$retry_count %||% 0L,
    integer(1)
  ))

  list(
    total = queue$total,
    pending = length(env$pending),
    in_flight = length(env$in_flight),
    completed = length(env$completed),
    failed = length(env$failed),
    total_retries = total_retries
  )
}

#' Get Queue Results
#'
#' @param queue A `shard_queue` object.
#' @return List of results from completed chunks.
#' @keywords internal
#' @noRd
queue_results <- function(queue) {
  env <- queue$env
  completed <- env$completed
  ids <- names(completed)
  if (length(ids) == 0) return(list())

  # Prefer numeric id ordering for predictable, lapply/sapply-like behavior.
  num_ids <- suppressWarnings(as.integer(ids))
  if (!anyNA(num_ids)) {
    completed <- completed[order(num_ids)]
  } else if (!is.null(env$order)) {
    ord_ids <- intersect(env$order, ids)
    completed <- completed[ord_ids]
  }

  lapply(completed, function(x) x$result)
}

#' Get Failed Chunks
#'
#' @param queue A `shard_queue` object.
#' @return List of failed chunk descriptors.
#' @keywords internal
#' @noRd
queue_failures <- function(queue) {
  queue$env$failed
}

#' @param x A \code{shard_queue} object.
#' @param ... Further arguments (ignored).
#' @return The input \code{x}, invisibly.
#' @export
#' @noRd
print.shard_queue <- function(x, ...) {
  status <- queue_status(x)
  cat("shard chunk queue\n")
  cat("  Total chunks:", status$total, "\n")
  cat("  Pending:", status$pending, "\n")
  cat("  In flight:", status$in_flight, "\n")
  cat("  Completed:", status$completed, "\n")
  cat("  Failed:", status$failed, "\n")
  cat("  Total retries:", status$total_retries, "\n")
  invisible(x)
}

Try the shard package in your browser

Any scripts or data that you put into this service are public.

shard documentation built on April 3, 2026, 9:08 a.m.