R/diagnostics.R

Defines functions print_segment_report print_task_report print_copy_report print_cow_report print_mem_report print_main_report print.shard_report segment_report task_report copy_report cow_report mem_report recommendations report

Documented in copy_report cow_report mem_report print.shard_report recommendations report segment_report task_report

#' @title Diagnostics API
#' @name diagnostics
#' @description Comprehensive diagnostics for shard parallel execution, providing
#'   insights into memory usage, worker status, task execution, and shared memory
#'   segments.
#'
#' @details
#' The diagnostics API provides multiple views into shard's runtime behavior:
#' \itemize{
#'   \item \code{report()}: Primary entry point with configurable detail levels
#'   \item \code{mem_report()}: Memory usage across workers
#'   \item \code{cow_report()}: Copy-on-write policy tracking
#'   \item \code{copy_report()}: Data transfer statistics
#'   \item \code{task_report()}: Task/chunk execution statistics
#'   \item \code{segment_report()}: Shared memory segment information
#' }
#'
#' All functions return S3 \code{shard_report} objects with appropriate print
#' methods for human-readable output.
NULL

#' Generate Shard Runtime Report
#'
#' Primary entry point for shard diagnostics. Generates a comprehensive report
#' of the current runtime state including pool status, memory usage, and
#' execution statistics.
#'
#' @param level Character. Detail level for the report:
#'   \itemize{
#'     \item \code{"summary"}: High-level overview (default)
#'     \item \code{"workers"}: Include per-worker details
#'     \item \code{"tasks"}: Include task execution history
#'     \item \code{"segments"}: Include shared memory segment details
#'   }
#' @param result Optional. A \code{shard_result} object from \code{\link{shard_map}}
#'   to include execution diagnostics from.
#'
#' @return An S3 object of class \code{shard_report} containing:
#'   \itemize{
#'     \item \code{level}: The requested detail level
#'     \item \code{timestamp}: When the report was generated
#'     \item \code{pool}: Pool status information (if pool exists)
#'     \item \code{memory}: Memory usage summary
#'     \item \code{workers}: Per-worker details (if level includes workers)
#'     \item \code{tasks}: Task execution details (if level includes tasks)
#'     \item \code{segments}: Segment details (if level includes segments)
#'     \item \code{result_diagnostics}: Diagnostics from shard_result (if provided)
#'   }
#'
#' @export
#' @examples
#' \donttest{
#' res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2)
#' pool_stop()
#' report(result = res)
#' }
report <- function(level = c("summary", "workers", "tasks", "segments"),
                   result = NULL) {
  level <- match.arg(level)

  # Build report based on level
  rpt <- list(
    level = level,
    timestamp = Sys.time(),
    pool = NULL,
    memory = NULL,
    workers = NULL,
    tasks = NULL,
    segments = NULL,
    result_diagnostics = NULL
  )

  # Get pool information
  pool <- pool_get()
  if (!is.null(pool)) {
    rpt$pool <- list(
      n_workers = pool$n,
      rss_limit = pool$rss_limit_bytes,
      drift_threshold = pool$rss_drift_threshold,
      created_at = pool$created_at,
      stats = pool$stats
    )
  }

  # Memory summary (always included)
  rpt$memory <- mem_report()

  # Worker details (for workers level and above)
  if (level %in% c("workers", "tasks", "segments")) {
    if (!is.null(pool)) {
      rpt$workers <- lapply(seq_len(pool$n), function(i) {
        w <- pool$workers[[i]]
        if (is.null(w)) {
          return(list(id = i, status = "missing"))
        }
        worker_metrics(w)
      })
    }
  }

  # Task details (for tasks level and above)
  if (level %in% c("tasks", "segments")) {
    if (!is.null(result) && inherits(result, "shard_result")) {
      rpt$tasks <- task_report(result)
      rpt$result_diagnostics <- result$diagnostics
    }
  }

  # Performance recommendations (only when a result is provided).
  if (!is.null(result) && inherits(result, "shard_result")) {
    rpt$recommendations <- recommendations(result)
  }

  # Segment details (for segments level only)
  if (level == "segments") {
    rpt$segments <- segment_report()
  }

  structure(rpt, class = "shard_report")
}

#' Performance Recommendations
#'
#' Uses run telemetry (copy/materialization stats, packing volume, buffer/table
#' writes, scratch pool stats) to produce actionable recommendations.
#'
#' @param result A `shard_result` from [shard_map()].
#' @return A character vector of recommendations (possibly empty).
#' @export
#' @examples
#' \donttest{
#' res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2)
#' pool_stop()
#' recommendations(res)
#' }
recommendations <- function(result) {
  if (is.null(result) || !inherits(result, "shard_result")) {
    stop("result must be a shard_result", call. = FALSE)
  }

  recs <- character(0)

  cr <- tryCatch(copy_report(result), error = function(e) NULL)
  if (is.list(cr)) {
    mb <- as.double(cr$view_materialized_bytes %||% 0)
    pb <- as.double(cr$view_packed_bytes %||% 0)
    if (mb > 0) {
      recs <- c(recs, sprintf(
        "Views were materialized (%.1f MB). Prefer view_block()/view_gather() and shard-aware kernels to avoid slice copies.",
        mb / 1024^2
      ))
      hs <- result$diagnostics$view_hotspots %||% list()
      if (is.list(hs) && length(hs) > 0) {
        keys <- names(hs)
        if (length(keys) > 3) keys <- keys[seq_len(3)]
        parts <- vapply(keys, function(k) {
          b <- as.double(hs[[k]]$bytes %||% 0)
          paste0(format_bytes(b), " @ ", k)
        }, character(1))
        recs <- c(recs, paste0("Top view materialization hotspots: ", paste(parts, collapse = " ; ")))
      }
    }
    if (pb > 0) {
      recs <- c(recs, sprintf(
        "Views required packing (%.1f MB). Prefer contiguous/block shards when possible, or use gather-aware kernels to keep packing bounded.",
        pb / 1024^2
      ))
    }
    bb <- as.double(cr$buffer_bytes %||% 0)
    tb <- as.double(cr$table_bytes %||% 0)
    if (bb == 0 && tb == 0 && (mb > 0 || pb > 0)) {
      recs <- c(recs, "Consider writing large outputs into explicit buffers (buffer()/table_buffer()/table_sink()) to avoid master-side gather/concat.")
    }
  }

  d <- result$diagnostics %||% list()

  # Dispatch overhead guidance (tiny tasks).
  if (is.list(d)) {
    dm <- as.character(d$dispatch_mode %||% "rpc_chunked")
    shards_done <- as.double(d$shards_processed %||% NA_real_)
    chunks_done <- as.double(d$chunks_dispatched %||% NA_real_)
    dur <- as.double(d$duration %||% NA_real_)

    # Heuristic: 1 shard per chunk + very small per-shard time means RPC overhead
    # likely dominates.
    if (identical(dm, "rpc_chunked") &&
        is.finite(shards_done) && is.finite(chunks_done) && is.finite(dur) &&
        shards_done > 0 && chunks_done > 0 &&
        chunks_done == shards_done) {
      mean_shard_time <- dur / shards_done
      if (is.finite(mean_shard_time) && mean_shard_time > 0 && mean_shard_time < 0.02) {
        recs <- c(recs, sprintf(
          "Per-shard work looks tiny (%.3f ms/shard). Consider increasing chunk_size, or use profile='speed' / dispatch_mode='shm_queue' for out-buffer workflows to reduce dispatch overhead.",
          mean_shard_time * 1000
        ))
      }
    }

    # shm_queue mode returns a placeholder list of length n_tasks; for extremely
    # large n_tasks, recommend increasing block_size to reduce bookkeeping.
    if (identical(dm, "shm_queue") && is.list(d$shm_queue)) {
      nt <- as.double(d$shm_queue$n_tasks %||% NA_real_)
      if (is.finite(nt) && nt > 1e6) {
        recs <- c(recs, sprintf(
          "shm_queue is tracking %.0f tasks. If bookkeeping dominates, increase dispatch_opts$block_size to reduce n_tasks (you can keep work-per-task small without going all the way to 1).",
          nt
        ))
      }
    }
  }

  if (is.list(d$scratch_stats)) {
    hw <- as.double(d$scratch_stats$high_water %||% 0)
    if (hw > 0) {
      recs <- c(recs, sprintf(
        "Scratch pool high-water mark: %.1f MB. If allocations churn, consider using scratch_matrix() (and set scratch_pool_config(max_bytes=...)).",
        hw / 1024^2
      ))
    }
  }

  if (is.list(d$scheduler) && is.finite(as.double(d$scheduler$throttle_events %||% 0)) &&
      as.double(d$scheduler$throttle_events %||% 0) > 0) {
    recs <- c(recs, sprintf(
      "Scheduler throttled heavy chunks %d times. Consider tuning scheduler_policy (e.g., max_huge_concurrency) or reducing per-shard footprint.",
      as.integer(d$scheduler$throttle_events)
    ))
  }

  unique(recs)
}

#' Memory Usage Report
#'
#' Generates a report of memory usage across all workers in the pool.
#'
#' @param pool Optional. A \code{shard_pool} object. If NULL, uses the current pool.
#'
#' @return An S3 object of class \code{shard_report} with type \code{"memory"}
#'   containing:
#'   \itemize{
#'     \item \code{type}: "memory"
#'     \item \code{timestamp}: When the report was generated
#'     \item \code{pool_active}: Whether a pool exists
#'     \item \code{n_workers}: Number of workers
#'     \item \code{rss_limit}: RSS limit per worker (bytes)
#'     \item \code{total_rss}: Sum of RSS across all workers
#'     \item \code{peak_rss}: Highest RSS among workers
#'     \item \code{mean_rss}: Mean RSS across workers
#'     \item \code{workers}: Per-worker RSS details
#'   }
#'
#' @export
#' @examples
#' \donttest{
#' p <- pool_create(2)
#' mem_report(p)
#' pool_stop(p)
#' }
mem_report <- function(pool = NULL) {
  if (is.null(pool)) {
    pool <- pool_get()
  }

  rpt <- list(
    type = "memory",
    timestamp = Sys.time(),
    pool_active = !is.null(pool),
    n_workers = 0L,
    rss_limit = NA_real_,
    total_rss = 0,
    peak_rss = NA_real_,
    mean_rss = NA_real_,
    workers = list()
  )

  if (is.null(pool)) {
    return(structure(rpt, class = "shard_report"))
  }

  rpt$n_workers <- pool$n
  rpt$rss_limit <- pool$rss_limit_bytes

  # Collect RSS for each worker
  worker_stats <- lapply(seq_len(pool$n), function(i) {
    w <- pool$workers[[i]]
    if (is.null(w)) {
      return(list(
        id = i,
        pid = NA_integer_,
        status = "missing",
        rss = NA_real_,
        rss_baseline = NA_real_,
        drift = NA_real_
      ))
    }

    alive <- worker_is_alive(w)
    rss <- if (alive) worker_rss(w) else NA_real_
    baseline <- w$rss_baseline %||% NA_real_
    drift <- if (!is.na(rss) && !is.na(baseline) && baseline > 0) {
      (rss - baseline) / baseline
    } else {
      NA_real_
    }

    list(
      id = i,
      pid = w$pid,
      status = if (alive) "alive" else "dead",
      rss = rss,
      rss_baseline = baseline,
      drift = drift,
      recycle_count = w$recycle_count %||% 0L
    )
  })

  rpt$workers <- worker_stats

  # Compute aggregates
  rss_values <- vapply(worker_stats, function(x) x$rss %||% NA_real_, numeric(1))
  rss_valid <- rss_values[!is.na(rss_values)]

  if (length(rss_valid) > 0) {
    rpt$total_rss <- sum(rss_valid)
    rpt$peak_rss <- max(rss_valid)
    rpt$mean_rss <- mean(rss_valid)
  }

  structure(rpt, class = "shard_report")
}

#' Copy-on-Write Policy Report
#'
#' Generates a report of copy-on-write behavior for borrowed inputs.
#'
#' @param result Optional. A \code{shard_result} object to extract COW stats from.
#'
#' @return An S3 object of class \code{shard_report} with type \code{"cow"}
#'   containing:
#'   \itemize{
#'     \item \code{type}: "cow"
#'     \item \code{timestamp}: When the report was generated
#'     \item \code{policy}: The COW policy used ("deny", "audit", "allow")
#'     \item \code{violations}: Count of COW violations detected (audit mode)
#'     \item \code{copies_triggered}: Estimated copies triggered by mutations
#'   }
#'
#' @export
#' @examples
#' \donttest{
#' res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2)
#' pool_stop()
#' cow_report(res)
#' }
cow_report <- function(result = NULL) {
  rpt <- list(
    type = "cow",
    timestamp = Sys.time(),
    policy = NA_character_,
    violations = 0L,
    copies_triggered = 0L,
    details = list()
  )

  if (!is.null(result) && inherits(result, "shard_result")) {
    rpt$policy <- result$cow_policy %||% NA_character_

    # Extract COW diagnostics if available
    if (!is.null(result$diagnostics) && !is.null(result$diagnostics$cow_stats)) {
      cow_stats <- result$diagnostics$cow_stats
      rpt$violations <- cow_stats$violations %||% 0L
      rpt$copies_triggered <- cow_stats$copies %||% 0L
      rpt$details <- cow_stats$details %||% list()
    }
  }

  structure(rpt, class = "shard_report")
}

#' Data Copy Report
#'
#' Generates a report of data transfer and copy statistics during parallel
#' execution.
#'
#' @param result Optional. A \code{shard_result} object to extract copy stats from.
#'
#' @return An S3 object of class \code{shard_report} with type \code{"copy"}
#'   containing:
#'   \itemize{
#'     \item \code{type}: "copy"
#'     \item \code{timestamp}: When the report was generated
#'     \item \code{borrow_exports}: Number of borrowed input exports
#'     \item \code{borrow_bytes}: Total bytes in borrowed inputs
#'     \item \code{result_imports}: Number of result imports
#'     \item \code{result_bytes}: Estimated bytes in results
#'     \item \code{buffer_writes}: Number of buffer write operations
#'     \item \code{buffer_bytes}: Total bytes written to buffers
#'   }
#'
#' @export
#' @examples
#' \donttest{
#' res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2)
#' pool_stop()
#' copy_report(res)
#' }
copy_report <- function(result = NULL) {
  rpt <- list(
    type = "copy",
    timestamp = Sys.time(),
    borrow_exports = 0L,
    borrow_bytes = 0,
    result_imports = 0L,
    result_bytes = 0,
    buffer_writes = 0L,
    buffer_bytes = 0,
    table_writes = 0L,
    table_rows = 0L,
    table_bytes = 0,
    view_created = 0L,
    view_materialized = 0L,
    view_materialized_bytes = 0,
    view_materialize_hotspots = list(),
    view_packed = 0L,
    view_packed_bytes = 0
  )

  if (!is.null(result) && inherits(result, "shard_result")) {
    diag <- result$diagnostics

    # Count results
    if (!is.null(result$results)) {
      if (inherits(result$results, c("shard_results_placeholder", "shard_row_groups", "shard_dataset", "shard_table_handle"))) {
        # shm_queue results are not gathered; avoid reporting placeholder size.
        rpt$result_imports <- 0L
        rpt$result_bytes <- 0
      } else {
        rpt$result_imports <- length(result$results)
        # Estimate result size
        rpt$result_bytes <- tryCatch(
          as.numeric(utils::object.size(result$results)),
          error = function(e) 0
        )
      }
    }

    # Extract copy diagnostics if available
    if (!is.null(diag) && !is.null(diag$copy_stats)) {
      copy_stats <- diag$copy_stats
      rpt$borrow_exports <- copy_stats$borrow_exports %||% 0L
      rpt$borrow_bytes <- copy_stats$borrow_bytes %||% 0
      rpt$buffer_writes <- copy_stats$buffer_writes %||% 0L
      rpt$buffer_bytes <- copy_stats$buffer_bytes %||% 0
    }

    if (!is.null(diag) && !is.null(diag$table_stats)) {
      ts <- diag$table_stats
      rpt$table_writes <- ts$writes %||% 0L
      rpt$table_rows <- ts$rows %||% 0L
      rpt$table_bytes <- ts$bytes %||% 0
    }
  }

  # Prefer run-level view stats when present (aggregated from workers).
  if (!is.null(result) && inherits(result, "shard_result")) {
    diag <- result$diagnostics
    if (!is.null(diag) && !is.null(diag$view_stats)) {
      vs <- diag$view_stats
      rpt$view_created <- vs$created %||% 0L
      rpt$view_materialized <- vs$materialized %||% 0L
      rpt$view_materialized_bytes <- vs$materialized_bytes %||% 0
      rpt$view_packed <- vs$packed %||% 0L
      rpt$view_packed_bytes <- vs$packed_bytes %||% 0
    }
    if (!is.null(diag) && is.list(diag$view_hotspots)) {
      rpt$view_materialize_hotspots <- diag$view_hotspots %||% list()
    }
  }

  if (is.null(rpt$view_created) || is.null(rpt$view_materialized)) {
    # Fallback to global view diagnostics when run-level aggregation isn't available.
    vd <- tryCatch(view_diagnostics(), error = function(e) NULL)
    if (is.list(vd)) {
      rpt$view_created <- vd$created %||% 0L
      rpt$view_materialized <- vd$materialized %||% 0L
      rpt$view_materialized_bytes <- vd$materialized_bytes %||% 0
      rpt$view_packed <- vd$packed %||% 0L
      rpt$view_packed_bytes <- vd$packed_bytes %||% 0
    }
  }

  structure(rpt, class = "shard_report")
}

#' Task Execution Report
#'
#' Generates a report of task/chunk execution statistics from a shard_map result.
#'
#' @param result A \code{shard_result} object from \code{\link{shard_map}}.
#'
#' @return An S3 object of class \code{shard_report} with type \code{"task"}
#'   containing:
#'   \itemize{
#'     \item \code{type}: "task"
#'     \item \code{timestamp}: When the report was generated
#'     \item \code{shards_total}: Total number of shards
#'     \item \code{shards_processed}: Number of shards successfully processed
#'     \item \code{shards_failed}: Number of permanently failed shards
#'     \item \code{chunks_dispatched}: Number of chunk batches dispatched
#'     \item \code{total_retries}: Total number of retry attempts
#'     \item \code{duration}: Total execution duration (seconds)
#'     \item \code{throughput}: Shards processed per second
#'     \item \code{queue_status}: Final queue status
#'   }
#'
#' @export
#' @examples
#' \donttest{
#' res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2)
#' pool_stop()
#' task_report(res)
#' }
task_report <- function(result = NULL) {
  rpt <- list(
    type = "task",
    timestamp = Sys.time(),
    shards_total = 0L,
    shards_processed = 0L,
    shards_failed = 0L,
    chunks_dispatched = 0L,
    total_retries = 0L,
    duration = NA_real_,
    throughput = NA_real_,
    queue_status = NULL,
    health_checks = list()
  )

  if (is.null(result) || !inherits(result, "shard_result")) {
    return(structure(rpt, class = "shard_report"))
  }

  # Extract from shards descriptor
  if (!is.null(result$shards)) {
    rpt$shards_total <- result$shards$num_shards %||% 0L
  }

  # Extract from diagnostics
  if (!is.null(result$diagnostics)) {
    diag <- result$diagnostics
    rpt$shards_processed <- diag$shards_processed %||% 0L
    rpt$chunks_dispatched <- diag$chunks_dispatched %||% 0L
    rpt$duration <- diag$duration %||% NA_real_
    rpt$health_checks <- diag$health_checks %||% list()
  }

  # Extract from queue status
  if (!is.null(result$queue_status)) {
    qs <- result$queue_status
    rpt$queue_status <- qs
    rpt$shards_failed <- qs$failed %||% 0L
    rpt$total_retries <- qs$total_retries %||% 0L
  }

  # Extract from failures
  if (!is.null(result$failures)) {
    rpt$shards_failed <- max(rpt$shards_failed, length(result$failures))
  }

  # Compute throughput
  if (!is.na(rpt$duration) && rpt$duration > 0 && rpt$shards_processed > 0) {
    rpt$throughput <- rpt$shards_processed / rpt$duration
  }

  structure(rpt, class = "shard_report")
}

#' Shared Memory Segment Report
#'
#' Generates a report of active shared memory segments in the current session.
#'
#' @return An S3 object of class \code{shard_report} with type \code{"segment"}
#'   containing:
#'   \itemize{
#'     \item \code{type}: "segment"
#'     \item \code{timestamp}: When the report was generated
#'     \item \code{n_segments}: Number of tracked segments
#'     \item \code{total_bytes}: Total bytes across all segments
#'     \item \code{segments}: List of segment details
#'   }
#'
#' @details
#' This function reports on segments that are currently accessible. Note that
#' segments are automatically cleaned up when their R objects are garbage
#' collected, so this only shows segments with live references.
#'
#' @export
#' @examples
#' \donttest{
#' segment_report()
#' }
segment_report <- function() {
  rpt <- list(
    type = "segment",
    timestamp = Sys.time(),
    n_segments = 0L,
    total_bytes = 0,
    segments = list(),
    backing_summary = list()
  )

  # Get available backings
  available <- tryCatch(
    available_backings(),
    error = function(e) c("mmap")
  )
  rpt$backing_summary$available <- available

  # Note: We can't enumerate all segments because they're tracked by

# individual R objects. This provides platform information instead.
  rpt$backing_summary$platform <- .Platform$OS.type
  rpt$backing_summary$is_windows <- tryCatch(
    is_windows(),
    error = function(e) .Platform$OS.type == "windows"
  )

  structure(rpt, class = "shard_report")
}

#' Print a shard_report Object
#'
#' @param x A \code{shard_report} object.
#' @param ... Ignored.
#' @return The input \code{x}, invisibly.
#' @examples
#' \donttest{
#' res <- shard_map(shards(100, workers = 2), function(s) sum(s$idx), workers = 2)
#' pool_stop()
#' rpt <- report(result = res)
#' print(rpt)
#' }
#' @export
print.shard_report <- function(x, ...) {
  # Dispatch based on report type
  if (!is.null(x$type)) {
    switch(x$type,
      "memory" = print_mem_report(x),
      "cow" = print_cow_report(x),
      "copy" = print_copy_report(x),
      "task" = print_task_report(x),
      "segment" = print_segment_report(x),
      print_main_report(x)
    )
  } else {
    print_main_report(x)
  }

  invisible(x)
}

#' Print Main Report
#' @param x A shard_report object.
#' @keywords internal
#' @noRd
print_main_report <- function(x) {
  cat("shard_report (", x$level, ")\n", sep = "")
  cat("Generated:", format(x$timestamp), "\n")

  # Pool summary
  if (!is.null(x$pool)) {
    cat("\nPool:\n")
    cat("  Workers:", x$pool$n_workers, "\n")
    cat("  RSS limit:", format_bytes(x$pool$rss_limit), "\n")
    cat("  Drift threshold:", sprintf("%.0f%%", x$pool$drift_threshold * 100), "\n")
    if (!is.null(x$pool$stats)) {
      cat("  Stats: ", x$pool$stats$total_tasks, " tasks, ",
          x$pool$stats$total_recycles, " recycles, ",
          x$pool$stats$total_deaths, " deaths\n", sep = "")
    }
  } else {
    cat("\nPool: (not active)\n")
  }

  # Memory summary
  if (!is.null(x$memory) && inherits(x$memory, "shard_report")) {
    cat("\nMemory:\n")
    if (x$memory$pool_active) {
      cat("  Total RSS:", format_bytes(x$memory$total_rss), "\n")
      cat("  Peak RSS:", format_bytes(x$memory$peak_rss), "\n")
      cat("  Mean RSS:", format_bytes(x$memory$mean_rss), "\n")
    } else {
      cat("  (no pool active)\n")
    }
  }

  # Worker details (if included)
  if (!is.null(x$workers) && length(x$workers) > 0) {
    cat("\nWorkers:\n")
    for (w in x$workers) {
      status_icon <- switch(w$status %||% "unknown",
        "alive" = "+",
        "ok" = "+",
        "dead" = "x",
        "missing" = "?",
        "?"
      )
      cat(sprintf("  [%s] %d: pid=%s, rss=%s, drift=%.1f%%, recycles=%d\n",
        status_icon,
        w$id %||% w$worker_id %||% NA,
        w$pid %||% "NA",
        format_bytes(w$rss %||% w$rss_bytes %||% NA_real_),
        (w$drift %||% 0) * 100,
        w$recycle_count %||% 0
      ))
    }
  }

  # Task details (if included)
  if (!is.null(x$tasks) && inherits(x$tasks, "shard_report")) {
    cat("\nTasks:\n")
    cat("  Total:", x$tasks$shards_total, "\n")
    cat("  Processed:", x$tasks$shards_processed, "\n")
    if (x$tasks$shards_failed > 0) {
      cat("  Failed:", x$tasks$shards_failed, "\n")
    }
    if (!is.na(x$tasks$duration)) {
      cat("  Duration:", sprintf("%.2f seconds", x$tasks$duration), "\n")
    }
    if (!is.na(x$tasks$throughput)) {
      cat("  Throughput:", sprintf("%.1f shards/sec", x$tasks$throughput), "\n")
    }
  }

  # Segment details (if included)
  if (!is.null(x$segments) && inherits(x$segments, "shard_report")) {
    cat("\nSegments:\n")
    cat("  Available backings:", paste(x$segments$backing_summary$available, collapse = ", "), "\n")
    cat("  Platform:", x$segments$backing_summary$platform, "\n")
  }

  if (!is.null(x$recommendations) && length(x$recommendations) > 0) {
    cat("\nRecommendations:\n")
    for (r in x$recommendations) {
      cat("  - ", r, "\n", sep = "")
    }
  }
}

#' Print Memory Report
#' @param x A shard_report object with type "memory".
#' @keywords internal
#' @noRd
print_mem_report <- function(x) {
  cat("shard memory report\n")
  cat("Generated:", format(x$timestamp), "\n")

  if (!x$pool_active) {
    cat("\nNo pool active.\n")
    return(invisible(x))
  }

  cat("\nPool: ", x$n_workers, " workers\n", sep = "")
  cat("RSS limit:", format_bytes(x$rss_limit), "\n")

  cat("\nAggregate:\n")
  cat("  Total:", format_bytes(x$total_rss), "\n")
  cat("  Peak:", format_bytes(x$peak_rss), "\n")
  cat("  Mean:", format_bytes(x$mean_rss), "\n")

  cat("\nPer-worker:\n")
  for (w in x$workers) {
    status_icon <- switch(w$status %||% "unknown",
      "alive" = "+",
      "ok" = "+",
      "dead" = "x",
      "missing" = "?",
      "?"
    )
    cat(sprintf("  [%s] %d: rss=%s, baseline=%s, drift=%.1f%%\n",
      status_icon,
      w$id,
      format_bytes(w$rss %||% NA_real_),
      format_bytes(w$rss_baseline %||% NA_real_),
      (w$drift %||% 0) * 100
    ))
  }
}

#' Print COW Report
#' @param x A shard_report object with type "cow".
#' @keywords internal
#' @noRd
print_cow_report <- function(x) {
  cat("shard copy-on-write report\n")
  cat("Generated:", format(x$timestamp), "\n")

  cat("\nPolicy:", x$policy %||% "(unknown)", "\n")
  cat("Violations:", x$violations, "\n")
  cat("Copies triggered:", x$copies_triggered, "\n")

  if (length(x$details) > 0) {
    cat("\nDetails:\n")
    for (d in x$details) {
      cat("  -", d, "\n")
    }
  }
}

#' Print Copy Report
#' @param x A shard_report object with type "copy".
#' @keywords internal
#' @noRd
print_copy_report <- function(x) {
  cat("shard data copy report\n")
  cat("Generated:", format(x$timestamp), "\n")

  cat("\nBorrowed inputs:\n")
  cat("  Exports:", x$borrow_exports, "\n")
  cat("  Bytes:", format_bytes(x$borrow_bytes), "\n")

  cat("\nResults:\n")
  cat("  Imports:", x$result_imports, "\n")
  cat("  Bytes:", format_bytes(x$result_bytes), "\n")

  cat("\nBuffers:\n")
  cat("  Writes:", x$buffer_writes, "\n")
  cat("  Bytes:", format_bytes(x$buffer_bytes), "\n")

  if (!is.null(x$table_writes) || !is.null(x$table_rows)) {
    cat("\nTables:\n")
    cat("  Writes:", x$table_writes %||% 0L, "\n")
    cat("  Rows:", x$table_rows %||% 0L, "\n")
    cat("  Bytes:", format_bytes(x$table_bytes %||% 0), "\n")
  }

  if (!is.null(x$view_created) || !is.null(x$view_materialized)) {
    cat("\nViews:\n")
    cat("  Created:", x$view_created %||% 0L, "\n")
    cat("  Materialized:", x$view_materialized %||% 0L, "\n")
    cat("  Materialized bytes:", format_bytes(x$view_materialized_bytes %||% 0), "\n")
    if (!is.null(x$view_packed) || !is.null(x$view_packed_bytes)) {
      cat("  Packed:", x$view_packed %||% 0L, "\n")
      cat("  Packed bytes:", format_bytes(x$view_packed_bytes %||% 0), "\n")
    }
  }
}

#' Print Task Report
#' @param x A shard_report object with type "task".
#' @keywords internal
#' @noRd
print_task_report <- function(x) {
  cat("shard task report\n")
  cat("Generated:", format(x$timestamp), "\n")

  cat("\nExecution:\n")
  cat("  Total shards:", x$shards_total, "\n")
  cat("  Processed:", x$shards_processed, "\n")
  cat("  Failed:", x$shards_failed, "\n")
  cat("  Chunks dispatched:", x$chunks_dispatched, "\n")

  if (x$total_retries > 0) {
    cat("  Retries:", x$total_retries, "\n")
  }

  cat("\nTiming:\n")
  if (!is.na(x$duration)) {
    cat("  Duration:", sprintf("%.2f seconds", x$duration), "\n")
  }
  if (!is.na(x$throughput)) {
    cat("  Throughput:", sprintf("%.1f shards/sec", x$throughput), "\n")
  }

  if (length(x$health_checks) > 0) {
    cat("\nHealth checks:", length(x$health_checks), "performed\n")
  }
}

#' Print Segment Report
#' @param x A shard_report object with type "segment".
#' @keywords internal
#' @noRd
print_segment_report <- function(x) {
  cat("shard segment report\n")
  cat("Generated:", format(x$timestamp), "\n")

  cat("\nBacking types:\n")
  cat("  Available:", paste(x$backing_summary$available, collapse = ", "), "\n")
  cat("  Platform:", x$backing_summary$platform, "\n")
  cat("  Windows:", x$backing_summary$is_windows, "\n")
}

Try the shard package in your browser

Any scripts or data that you put into this service are public.

shard documentation built on April 3, 2026, 9:08 a.m.