R/dispatch.R

Defines functions print.shard_dispatch_result pool_sapply pool_lapply dispatch_chunks

Documented in dispatch_chunks pool_lapply pool_sapply print.shard_dispatch_result

#' @title Task Dispatch Engine
#' @description Orchestrates chunk dispatch with worker supervision and failure handling.
#' @name dispatch
NULL

#' Dispatch Chunks to Worker Pool
#'
#' Executes a function over chunks using the worker pool with supervision.
#' Handles worker death and recycling transparently by requeuing failed chunks.
#'
#' @param chunks List of chunk descriptors. Each chunk will be passed to `fun`.
#' @param fun Function to execute. Receives (chunk, ...) as arguments.
#' @param ... Additional arguments passed to `fun`.
#' @param pool A `shard_pool` object. If NULL, uses the current pool.
#' @param health_check_interval Integer. Check pool health every N chunks (default 10).
#' @param max_retries Integer. Maximum retries per chunk before permanent failure (default 3).
#' @param timeout Numeric. Seconds to wait for each chunk (default 3600).
#' @param scheduler_policy Optional list of scheduling hints (advanced). Currently:
#'   - `max_huge_concurrency`: cap concurrent chunks with `footprint_class=="huge"`.
#' @param on_result Optional callback (advanced). If provided, called on the
#'   master process as `on_result(tag, value, worker_id)` for each successful
#'   chunk completion. Used by [shard_reduce()] to stream reductions.
#' @param store_results Logical (advanced). If FALSE, successful chunk values are
#'   not retained in the returned `results` list (streaming use cases).
#' @param retain_chunks Logical (advanced). If FALSE, completed chunk descriptors
#'   are stored minimally (avoids retaining large shard lists in memory).
#'
#' @return A \code{shard_dispatch_result} object with results and diagnostics.
#' @export
#' @examples
#' \donttest{
#' pool_create(2)
#' chunks <- list(list(id = 1L, x = 1), list(id = 2L, x = 2))
#' result <- dispatch_chunks(chunks, function(chunk) chunk$x * 2, pool = pool_get())
#' pool_stop()
#' }
dispatch_chunks <- function(chunks, fun, ...,
                            pool = NULL,
                            health_check_interval = 10L,
                            max_retries = 3L,
                            timeout = 3600,
                            scheduler_policy = NULL,
                            on_result = NULL,
                            store_results = TRUE,
                            retain_chunks = TRUE) {
  if (is.null(pool)) {
    pool <- pool_get()
  }

  if (is.null(pool)) {
    stop("No active pool. Call pool_create() first.", call. = FALSE)
  }

  # Capture additional arguments as a list
  extra_args <- list(...)

  # Avoid `parallel:::` usage (R CMD check will warn). We still rely on
  # parallel's internal protocol, but obtain the function dynamically.
  parallel_sendCall <- utils::getFromNamespace("sendCall", "parallel")

  # Create queue
  queue <- queue_create(chunks)

  # Track diagnostics
  diag <- list(
    start_time = Sys.time(),
    health_checks = list(),
    worker_assignments = list(),
    scheduler = list(throttle_events = 0L)
  )

  # Export fun + args to workers once to avoid per-chunk re-export overhead.
  # Each worker runs a small wrapper via sendCall/recv that references these globals.
  for (i in seq_len(pool$n)) {
    w <- pool$workers[[i]]
    if (!is.null(w) && worker_is_alive(w)) {
      export_env <- new.env(parent = emptyenv())
      export_env$.shard_dispatch_fun <- fun
      export_env$.shard_dispatch_args <- extra_args
      parallel::clusterExport(w$cluster, c(".shard_dispatch_fun", ".shard_dispatch_args"),
                              envir = export_env)
    }
  }

  dispatch_wrapper <- function(chunk) {
    vd0 <- tryCatch(view_diagnostics(), error = function(e) NULL)
    cd0 <- tryCatch(buffer_diagnostics(), error = function(e) NULL)
    td0 <- tryCatch(table_diagnostics(), error = function(e) NULL)
    sd0 <- tryCatch(scratch_diagnostics(), error = function(e) NULL)
    res <- tryCatch(
      {
        f <- get(".shard_dispatch_fun", envir = globalenv(), inherits = FALSE)
        a <- get(".shard_dispatch_args", envir = globalenv(), inherits = FALSE)
        list(ok = TRUE, value = do.call(f, c(list(chunk), a)))
      },
      error = function(e) list(ok = FALSE, error = conditionMessage(e))
    )
    vd1 <- tryCatch(view_diagnostics(), error = function(e) NULL)
    cd1 <- tryCatch(buffer_diagnostics(), error = function(e) NULL)
    td1 <- tryCatch(table_diagnostics(), error = function(e) NULL)
    sd1 <- tryCatch(scratch_diagnostics(), error = function(e) NULL)

    if (is.list(vd0) && is.list(vd1)) {
      res$view_delta <- list(
        created = (vd1$created %||% 0L) - (vd0$created %||% 0L),
        materialized = (vd1$materialized %||% 0L) - (vd0$materialized %||% 0L),
        materialized_bytes = (vd1$materialized_bytes %||% 0) - (vd0$materialized_bytes %||% 0),
        packed = (vd1$packed %||% 0L) - (vd0$packed %||% 0L),
        packed_bytes = (vd1$packed_bytes %||% 0) - (vd0$packed_bytes %||% 0)
      )
    }

    # Best-effort attribution: only compute hotspot deltas when materialization occurred.
    if (is.list(res$view_delta) && (res$view_delta$materialized %||% 0L) > 0L) {
      hs1 <- tryCatch(view_materialize_hotspots_snapshot_(), error = function(e) NULL)
      if (is.list(hs1)) {
        hs0 <- if (exists(".shard_view_hotspot_snapshot", envir = .shard_worker_env, inherits = FALSE)) {
          get(".shard_view_hotspot_snapshot", envir = .shard_worker_env)
        } else {
          list()
        }

        delta <- list()
        keys <- unique(c(names(hs0), names(hs1)))
        for (k in keys) {
          v1 <- hs1[[k]] %||% list()
          v0 <- hs0[[k]] %||% list()
          b1 <- v1$bytes %||% 0
          c1 <- v1$count %||% 0L
          b0 <- v0$bytes %||% 0
          c0 <- v0$count %||% 0L
          db <- as.double(b1) - as.double(b0)
          dc <- as.integer(c1) - as.integer(c0)
          if (is.finite(db) && db > 0) {
            delta[[k]] <- list(bytes = db, count = dc)
          }
        }

        assign(".shard_view_hotspot_snapshot", hs1, envir = .shard_worker_env)
        if (length(delta) > 0) res$view_hotspots_delta <- delta
      }
    }

    if (is.list(cd0) && is.list(cd1)) {
      res$copy_delta <- list(
        buffer_writes = (cd1$writes %||% 0L) - (cd0$writes %||% 0L),
        buffer_bytes = (cd1$bytes %||% 0) - (cd0$bytes %||% 0)
      )
    }

    if (is.list(td0) && is.list(td1)) {
      res$table_delta <- list(
        table_writes = (td1$writes %||% 0L) - (td0$writes %||% 0L),
        table_rows = (td1$rows %||% 0L) - (td0$rows %||% 0L),
        table_bytes = (td1$bytes %||% 0) - (td0$bytes %||% 0)
      )
    }

    if (is.list(sd0) && is.list(sd1)) {
      res$scratch_delta <- list(
        hits = (sd1$hits %||% 0L) - (sd0$hits %||% 0L),
        misses = (sd1$misses %||% 0L) - (sd0$misses %||% 0L),
        high_water = sd1$high_water %||% 0
      )
      res$scratch_needs_recycle <- isTRUE(sd1$needs_recycle)
    }

    res
  }

  # Async scheduling state
  idle <- rep(TRUE, pool$n)
  inflight <- vector("list", pool$n) # worker_id -> {chunk_id,start_time}
  inflight_class <- rep(NA_character_, pool$n)

  chunks_processed <- 0L
  view_stats <- list(created = 0L, materialized = 0L, materialized_bytes = 0,
                     packed = 0L, packed_bytes = 0)
  view_hotspots <- list()
  copy_stats <- list(borrow_exports = 0L, borrow_bytes = 0, buffer_writes = 0L, buffer_bytes = 0)
  table_stats <- list(writes = 0L, rows = 0L, bytes = 0)
  scratch_stats <- list(hits = 0L, misses = 0L, high_water = 0)

  policy <- scheduler_policy %||% list()
  max_huge <- as.integer(policy$max_huge_concurrency %||% NA_integer_)
  if (!is.na(max_huge) && max_huge < 1L) max_huge <- 1L

  chunk_class <- function(ch) {
    (ch$footprint_class %||% "tiny")
  }

  huge_inflight <- function() {
    sum(inflight_class == "huge", na.rm = TRUE)
  }

  # Helper: receive a single result (non-blocking with small timeout) from any worker.
  recv_one <- function(timeout_sec = 0.1) {
    socklist <- list()
    idx_map <- integer(0)
    for (i in seq_along(pool$workers)) {
      w <- pool$workers[[i]]
      if (!is.null(w) && worker_is_alive(w)) {
        n <- w$cluster[[1]]
        if (!is.null(n) && !is.null(n$con)) {
          if (isOpen(n$con)) {
            socklist[[length(socklist) + 1L]] <- n$con
            idx_map <- c(idx_map, i)
          }
        }
      }
    }

    if (length(socklist) == 0) return(NULL)

    # socketSelect() is provided by base R (works for socket connections).
    ready <- socketSelect(socklist, timeout = timeout_sec)
    if (length(ready) == 0 || !any(ready)) return(NULL)

    # socketSelect is documented to return a logical vector (like parallel uses),
    # but be defensive if it returns indices on some platforms.
    if (is.logical(ready)) {
      n_local <- which.max(ready)
    } else if (is.numeric(ready)) {
      # Some platforms return a 0/1 numeric vector; others may return indices.
      if (length(ready) == length(socklist)) {
        n_local <- which.max(ready)
      } else {
        n_local <- as.integer(ready[1])
      }
    } else {
      return(NULL)
    }

    if (is.na(n_local) || n_local < 1L || n_local > length(socklist)) {
      return(NULL)
    }
    worker_id <- idx_map[n_local]

    msg <- tryCatch(
      unserialize(socklist[[n_local]]),
      error = function(e) list(tag = NA_character_, value = list(ok = FALSE, error = conditionMessage(e)),
                               .recv_error = TRUE)
    )
    list(worker_id = worker_id, tag = msg$tag, value = msg$value, recv_error = isTRUE(msg$.recv_error))
  }

  # Main dispatch loop: keep all workers busy and collect results as they finish.
  while (!queue_is_done(queue)) {
    # Health check at intervals (based on completions)
    if (chunks_processed > 0 && chunks_processed %% health_check_interval == 0) {
      busy_ids <- which(!idle)
      health <- pool_health_check(pool, busy_workers = busy_ids)
      diag$health_checks <- c(diag$health_checks, list(health))

      # Refresh pool reference after potential recycle/restart.
      pool <- pool_get()

      # Requeue any in-flight chunks from recycled/restarted workers
      for (action in health$worker_actions) {
        if (action$action %in% c("restart", "recycle")) {
          requeued <- queue_requeue_worker(queue, action$worker_id)
          if (requeued > 0) {
            message(sprintf("Requeued %d chunks from worker %d (%s)",
                            requeued, action$worker_id, action$reason))
          }
          idle[action$worker_id] <- TRUE
          inflight[action$worker_id] <- list(NULL)
          inflight_class[action$worker_id] <- NA_character_

          # Ensure the replacement worker has the dispatch globals.
          w_new <- pool$workers[[action$worker_id]]
          if (!is.null(w_new)) {
            export_env <- new.env(parent = emptyenv())
            export_env$.shard_dispatch_fun <- fun
            export_env$.shard_dispatch_args <- extra_args
            parallel::clusterExport(w_new$cluster,
                                    c(".shard_dispatch_fun", ".shard_dispatch_args"),
                                    envir = export_env)
          }
        }
      }
    }

    # Fill idle workers
    for (worker_id in which(idle)) {
      if (!queue_has_pending(queue)) break

      pred <- NULL
      if (!is.na(max_huge) && huge_inflight() >= max_huge) {
        pred <- function(ch) chunk_class(ch) != "huge"
      }

      chunk <- queue_next_where(queue, worker_id, pred)
      if (is.null(chunk)) {
        # If we couldn't find a non-huge chunk under throttling, leave the
        # worker idle and wait for an in-flight huge task to finish.
        if (!is.null(pred)) {
          diag$scheduler$throttle_events <- diag$scheduler$throttle_events + 1L
        }
        next
      }

      w <- pool$workers[[worker_id]]
      if (is.null(w) || !worker_is_alive(w)) {
        pool <- pool_restart_worker_(pool, worker_id)
        w <- pool$workers[[worker_id]]

        # Re-export globals to the restarted worker
        export_env <- new.env(parent = emptyenv())
        export_env$.shard_dispatch_fun <- fun
        export_env$.shard_dispatch_args <- extra_args
        parallel::clusterExport(w$cluster, c(".shard_dispatch_fun", ".shard_dispatch_args"),
                                envir = export_env)
      }

      # If this worker was flagged for recycle while busy, do it now (safe point).
      if (isTRUE(w$needs_recycle)) {
        min_age <- pool$min_recycle_interval %||% 0
        age <- as.numeric(difftime(Sys.time(), w$spawned_at %||% Sys.time(), units = "secs"))
        if (!is.na(age) && is.finite(min_age) && age < min_age) {
          # Cooldown: avoid thrashing PSOCK creation/destruction under extremely
          # tight rss_limit settings.
          # We'll keep processing and recycle on a later safe point.
          w$needs_recycle <- TRUE
        } else {
          pool$workers[[worker_id]] <- worker_recycle(w, pool$init_expr, pool$packages, dev_path = pool$dev_path)
          pool$workers[[worker_id]]$rss_baseline <- worker_rss(pool$workers[[worker_id]])
          pool$workers[[worker_id]]$needs_recycle <- FALSE
          pool$stats$total_recycles <- pool$stats$total_recycles + 1L
          .pool_env$pool <- pool
          w <- pool$workers[[worker_id]]

          export_env <- new.env(parent = emptyenv())
          export_env$.shard_dispatch_fun <- fun
          export_env$.shard_dispatch_args <- extra_args
          parallel::clusterExport(w$cluster, c(".shard_dispatch_fun", ".shard_dispatch_args"),
                                  envir = export_env)
        }
      }

      # Async send
      parallel_sendCall(w$cluster[[1]], fun = dispatch_wrapper, args = list(chunk),
                        return = TRUE, tag = as.character(chunk$id))
      idle[worker_id] <- FALSE
      inflight[[worker_id]] <- list(chunk_id = as.character(chunk$id), start_time = Sys.time())
      inflight_class[worker_id] <- chunk_class(chunk)
      pool$stats$total_tasks <- pool$stats$total_tasks + 1L
      .pool_env$pool <- pool
    }

    # Handle timeouts for in-flight chunks
    if (any(!idle)) {
      now <- Sys.time()
      for (worker_id in which(!idle)) {
        meta <- inflight[[worker_id]]
        if (is.null(meta)) next
        age <- as.numeric(difftime(now, meta$start_time, units = "secs"))
        if (!is.na(age) && age > timeout) {
          # Kill and restart the worker; requeue or permanently fail the chunk.
          chunk_id <- meta$chunk_id
          ch <- queue$env$in_flight[[chunk_id]]
          retry_count <- (ch$retry_count %||% 0L)
          requeue <- retry_count < max_retries
          queue_fail(queue, chunk_id, error = "timeout", requeue = requeue)
          if (!requeue) {
            warning(sprintf("Chunk %s permanently failed after %d retries: %s",
                            chunk_id, retry_count, "timeout"))
          }

          pool <- pool_restart_worker_(pool, worker_id, graceful = FALSE)

          # Re-export globals to the restarted worker
          export_env <- new.env(parent = emptyenv())
          export_env$.shard_dispatch_fun <- fun
          export_env$.shard_dispatch_args <- extra_args
          parallel::clusterExport(pool$workers[[worker_id]]$cluster,
                                  c(".shard_dispatch_fun", ".shard_dispatch_args"),
                                  envir = export_env)

          idle[worker_id] <- TRUE
          inflight[worker_id] <- list(NULL)
          inflight_class[worker_id] <- NA_character_
        }
      }
    }

    # Receive at most one completed chunk per loop iteration.
    rec <- recv_one(timeout_sec = 0.1)
    if (is.null(rec)) {
      if (!queue_has_pending(queue) && any(!idle)) {
        Sys.sleep(0.01)
      }
      next
    }

    worker_id <- rec$worker_id
    tag <- rec$tag
    payload <- rec$value

    if (isTRUE(rec$recv_error)) {
      # Treat a receive/unserialize failure as a worker death while running.
      meta <- inflight[[worker_id]]
      chunk_id <- meta$chunk_id %||% as.character(tag)
      if (!is.null(chunk_id) && !is.na(chunk_id)) {
        ch <- queue$env$in_flight[[as.character(chunk_id)]]
        retry_count <- (ch$retry_count %||% 0L)
        requeue <- retry_count < max_retries
        queue_fail(queue, as.character(chunk_id), error = payload$error %||% "recv error", requeue = requeue)
        if (!requeue) {
          warning(sprintf("Chunk %s permanently failed after %d retries: %s",
                          as.character(chunk_id), retry_count, payload$error %||% "recv error"))
        }
      }

      pool <- pool_restart_worker_(pool, worker_id, graceful = FALSE)

      export_env <- new.env(parent = emptyenv())
      export_env$.shard_dispatch_fun <- fun
      export_env$.shard_dispatch_args <- extra_args
      parallel::clusterExport(pool$workers[[worker_id]]$cluster,
                              c(".shard_dispatch_fun", ".shard_dispatch_args"),
                              envir = export_env)

      idle[worker_id] <- TRUE
      inflight[worker_id] <- list(NULL)
      inflight_class[worker_id] <- NA_character_
      next
    }

    idle[worker_id] <- TRUE
    inflight[worker_id] <- list(NULL)
    inflight_class[worker_id] <- NA_character_

    if (is.list(payload) && isTRUE(payload$ok)) {
      val <- payload$value
      if (!is.null(on_result)) {
        if (!is.function(on_result)) stop("on_result must be a function or NULL", call. = FALSE)
        # If the reducer throws, fail fast: continuing would silently corrupt results.
        on_result(tag = tag, value = val, worker_id = worker_id)
      }
      queue_complete(queue, tag, if (isTRUE(store_results)) val else NULL, retain = retain_chunks)
    } else {
      err <- if (is.list(payload)) payload$error else "unknown worker error"
      ch <- queue$env$in_flight[[as.character(tag)]]
      retry_count <- (ch$retry_count %||% 0L)
      if (retry_count >= max_retries) {
        queue_fail(queue, tag, error = err, requeue = FALSE)
        warning(sprintf("Chunk %s permanently failed after %d retries: %s",
                        tag, retry_count, err))
      } else {
        queue_fail(queue, tag, error = err, requeue = TRUE)
      }
    }

    if (is.list(payload) && is.list(payload$view_delta)) {
      vd <- payload$view_delta
      view_stats$created <- view_stats$created + (vd$created %||% 0L)
      view_stats$materialized <- view_stats$materialized + (vd$materialized %||% 0L)
      view_stats$materialized_bytes <- view_stats$materialized_bytes + (vd$materialized_bytes %||% 0)
      view_stats$packed <- view_stats$packed + (vd$packed %||% 0L)
      view_stats$packed_bytes <- view_stats$packed_bytes + (vd$packed_bytes %||% 0)
    }

    if (is.list(payload) && is.list(payload$view_hotspots_delta)) {
      hsd <- payload$view_hotspots_delta
      for (k in names(hsd)) {
        cur <- view_hotspots[[k]] %||% list(bytes = 0, count = 0L)
        cur$bytes <- (cur$bytes %||% 0) + (hsd[[k]]$bytes %||% 0)
        cur$count <- as.integer((cur$count %||% 0L) + (hsd[[k]]$count %||% 0L))
        view_hotspots[[k]] <- cur
      }
    }

    if (is.list(payload) && is.list(payload$copy_delta)) {
      cd <- payload$copy_delta
      copy_stats$buffer_writes <- copy_stats$buffer_writes + (cd$buffer_writes %||% 0L)
      copy_stats$buffer_bytes <- copy_stats$buffer_bytes + (cd$buffer_bytes %||% 0)
    }

    if (is.list(payload) && is.list(payload$table_delta)) {
      td <- payload$table_delta
      table_stats$writes <- table_stats$writes + (td$table_writes %||% 0L)
      table_stats$rows <- table_stats$rows + (td$table_rows %||% 0L)
      table_stats$bytes <- table_stats$bytes + (td$table_bytes %||% 0)
    }

    if (is.list(payload) && is.list(payload$scratch_delta)) {
      sd <- payload$scratch_delta
      scratch_stats$hits <- scratch_stats$hits + (sd$hits %||% 0L)
      scratch_stats$misses <- scratch_stats$misses + (sd$misses %||% 0L)
      scratch_stats$high_water <- max(as.double(scratch_stats$high_water), as.double(sd$high_water %||% 0))
    }

    if (isTRUE(payload$scratch_needs_recycle)) {
      # Request a recycle at the next safe point (before the worker is reused).
      w <- pool$workers[[worker_id]]
      if (!is.null(w)) {
        w$needs_recycle <- TRUE
        pool$workers[[worker_id]] <- w
        .pool_env$pool <- pool
      }
    }

    chunks_processed <- chunks_processed + 1L
  }

  diag$end_time <- Sys.time()
  diag$duration <- as.numeric(difftime(diag$end_time, diag$start_time, units = "secs"))
  diag$view_stats <- view_stats
  if (length(view_hotspots) > 0) {
    ord <- order(vapply(view_hotspots, function(x) as.double(x$bytes %||% 0), numeric(1)), decreasing = TRUE)
    view_hotspots <- view_hotspots[ord]
    if (length(view_hotspots) > 20) view_hotspots <- view_hotspots[seq_len(20)]
    diag$view_hotspots <- view_hotspots
  } else {
    diag$view_hotspots <- list()
  }
  diag$copy_stats <- copy_stats
  diag$table_stats <- table_stats
  diag$scratch_stats <- scratch_stats

  structure(
    list(
      results = queue_results(queue),
      failures = queue_failures(queue),
      queue_status = queue_status(queue),
      diagnostics = diag,
      pool_stats = pool_get()$stats
    ),
    class = "shard_dispatch_result"
  )
}

#' Parallel Dispatch with Async Workers
#'
#' An alternative dispatch that uses parallel::parLapply-style execution
#' but with supervision. This is a simpler interface for basic parallel apply.
#'
#' @param X List or vector to iterate over.
#' @param FUN Function to apply to each element.
#' @param ... Additional arguments to FUN.
#' @param pool A `shard_pool` object. If NULL, uses current pool.
#' @param chunk_size Integer. Elements per chunk (default 1).
#'
#' @return A list of results.
#' @export
#' @examples
#' \donttest{
#' pool_create(2)
#' result <- pool_lapply(1:4, function(x) x^2, pool = pool_get())
#' pool_stop()
#' }
pool_lapply <- function(X, FUN, ..., pool = NULL, chunk_size = 1L) {
  if (is.null(pool)) {
    pool <- pool_get()
  }

  if (is.null(pool)) {
    stop("No active pool. Call pool_create() first.", call. = FALSE)
  }

  # Capture extra args
  extra_args <- list(...)

  # Create chunks with the actual function embedded
  n <- length(X)
  chunks <- lapply(seq_len(ceiling(n / chunk_size)), function(i) {
    start <- (i - 1L) * chunk_size + 1L
    end <- min(i * chunk_size, n)
    list(
      id = i,
      indices = start:end,
      elements = X[start:end],
      FUN = FUN,
      extra_args = extra_args
    )
  })

  # Wrapper function that processes a chunk
  chunk_fun <- function(chunk) {
    FUN <- chunk$FUN
    extra_args <- chunk$extra_args
    lapply(chunk$elements, function(x) do.call(FUN, c(list(x), extra_args)))
  }

  # Dispatch
  result <- dispatch_chunks(chunks, chunk_fun, pool = pool)

  # Flatten results
  unlist(result$results, recursive = FALSE)
}

#' Parallel sapply with Supervision
#'
#' @param X List or vector to iterate over.
#' @param FUN Function to apply.
#' @param ... Additional arguments to FUN.
#' @param simplify Logical. Simplify result to vector/matrix?
#' @param pool A `shard_pool` object. If NULL, uses current pool.
#'
#' @return Simplified result if possible, otherwise a list.
#' @export
#' @examples
#' \donttest{
#' pool_create(2)
#' result <- pool_sapply(1:4, function(x) x^2, pool = pool_get())
#' pool_stop()
#' }
pool_sapply <- function(X, FUN, ..., simplify = TRUE, pool = NULL) {
  result <- pool_lapply(X, FUN, ..., pool = pool)

  if (simplify) {
    tryCatch(
      simplify2array(result),
      error = function(e) result
    )
  } else {
    result
  }
}

#' Print a shard_dispatch_result Object
#'
#' @param x A \code{shard_dispatch_result} object.
#' @param ... Further arguments (ignored).
#' @return The input \code{x}, invisibly.
#' @export
#' @examples
#' \donttest{
#' pool_create(2)
#' chunks <- list(list(id = 1L, x = 1), list(id = 2L, x = 2))
#' result <- dispatch_chunks(chunks, function(chunk) chunk$x, pool = pool_get())
#' print(result)
#' pool_stop()
#' }
print.shard_dispatch_result <- function(x, ...) {
  cat("shard dispatch result\n")
  cat("  Duration:", sprintf("%.2f seconds", x$diagnostics$duration), "\n")

  status <- x$queue_status
  cat("  Chunks: ", status$completed, " completed",
      if (status$failed > 0) paste0(", ", status$failed, " failed") else "",
      if (status$total_retries > 0) paste0(" (", status$total_retries, " retries)") else "",
      "\n", sep = "")

  cat("  Pool stats: ", x$pool_stats$total_recycles, " recycles, ",
      x$pool_stats$total_deaths, " deaths\n", sep = "")

  if (length(x$failures) > 0) {
    cat("  Failed chunks:", paste(names(x$failures), collapse = ", "), "\n")
  }

  invisible(x)
}

Try the shard package in your browser

Any scripts or data that you put into this service are public.

shard documentation built on April 3, 2026, 9:08 a.m.