R/03-prevalence.R

Defines functions .wilson_ci .stratified_var_lin .stratified_var_ht .ps_estimator .hajek_estimator .ht_estimator surv_naive_prevalence surv_lineage_prevalence

Documented in surv_lineage_prevalence surv_naive_prevalence .wilson_ci

# ============================================================
# Design-weighted lineage prevalence estimation
# ============================================================

#' Estimate lineage prevalence with design weights
#'
#' Estimates the prevalence of a specified pathogen lineage over time,
#' correcting for unequal sequencing rates across strata.
#'
#' @param design A `surv_design` object.
#' @param lineage Character. Target lineage name.
#' @param time Character. Time aggregation: `"epiweek"`, `"month"`,
#'   `"date"`, or a column name. Default `"epiweek"`.
#' @param method Character. Estimation method: `"hajek"` (default),
#'   `"horvitz_thompson"`, or `"poststratified"`.
#' @param conf_level Numeric. Confidence level. Default 0.95.
#' @param min_obs Integer. Minimum observations per time period.
#'   Default 5.
#'
#' @return A `surv_prevalence` object.
#'
#' @examples
#' sim <- surv_simulate(n_regions = 3, n_weeks = 10, seed = 1)
#' d <- surv_design(sim$sequences, ~ region,
#'                  sim$population[c("region", "seq_rate")], sim$population)
#' prev <- surv_lineage_prevalence(d, "BA.2.86")
#' print(prev)
#'
#' @export
surv_lineage_prevalence <- function(design,
                                    lineage,
                                    time = "epiweek",
                                    method = c("hajek", "horvitz_thompson",
                                               "poststratified"),
                                    conf_level = 0.95,
                                    min_obs = 5L) {
  .assert_surv_design(design)
  checkmate::assert_string(lineage)
  method <- match.arg(method)
  checkmate::assert_number(conf_level, lower = 0.5, upper = 0.999)
  checkmate::assert_count(min_obs, positive = TRUE)

  # Validate lineage exists, suggest closest match if not
  available_lineages <- unique(design$data[[design$col_lineage]])
  if (!lineage %in% available_lineages) {
    distances <- utils::adist(lineage, available_lineages)[1, ]
    closest <- available_lineages[which.min(distances)]
    avail_str <- paste(utils::head(sort(available_lineages), 10), collapse = ", ")
    cli::cli_warn(c(
      "Lineage {.val {lineage}} not found in data.",
      "i" = "Available lineages: {avail_str}",
      "i" = "Did you mean {.val {closest}}?"
    ))
  }

  dat <- .map_weights_to_obs(design)
  resolved <- .resolve_time_column(dat, time, design$col_date_collected)
  dat <- resolved$data
  time_col <- resolved$col

  dat$.is_target <- as.integer(dat[[design$col_lineage]] == lineage)
  z_alpha <- stats::qnorm(1 - (1 - conf_level) / 2)

  time_vals <- sort(unique(dat[[time_col]]))

  estimates <- purrr::map_dfr(time_vals, function(t) {
    d <- dat[dat[[time_col]] == t, , drop = FALSE]
    if (nrow(d) < min_obs) {
      return(tibble::tibble(
        time = t, lineage = lineage, n_obs = nrow(d),
        n_target = sum(d$.is_target), prevalence = NA_real_,
        se = NA_real_, ci_lower = NA_real_, ci_upper = NA_real_,
        effective_n = NA_real_, flag = "insufficient_obs"
      ))
    }

    result <- switch(method,
      "horvitz_thompson" = .ht_estimator(d, design$strata_vars),
      "hajek"            = .hajek_estimator(d, design$strata_vars),
      "poststratified"   = .ps_estimator(d, design$strata_vars, design$strata_info)
    )

    tibble::tibble(
      time = t, lineage = lineage, n_obs = nrow(d),
      n_target = sum(d$.is_target),
      prevalence = result$estimate, se = result$se,
      ci_lower = .wilson_ci(result$estimate, result$effective_n, z_alpha, "lower"),
      ci_upper = .wilson_ci(result$estimate, result$effective_n, z_alpha, "upper"),
      effective_n = result$effective_n, flag = "ok"
    )
  })

  new_surv_prevalence(estimates, design, method, lineage, conf_level, time)
}


#' Compute naive (unweighted) lineage prevalence
#'
#' Simple proportion without design correction. Useful as baseline.
#'
#' @param design A `surv_design` object.
#' @param lineage Character. Target lineage.
#' @param time Character. Time aggregation. Default `"epiweek"`.
#' @param conf_level Numeric. Default 0.95.
#'
#' @return A `surv_prevalence` object with `method = "naive"`.
#'
#' @examples
#' sim <- surv_simulate(n_regions = 3, n_weeks = 10, seed = 1)
#' d <- surv_design(sim$sequences, ~ region,
#'                  sim$population[c("region", "seq_rate")], sim$population)
#' naive <- surv_naive_prevalence(d, "BA.2.86")
#'
#' @export
surv_naive_prevalence <- function(design, lineage, time = "epiweek",
                                  conf_level = 0.95) {
  .assert_surv_design(design)
  checkmate::assert_string(lineage)

  available_lineages <- unique(design$data[[design$col_lineage]])
  if (!lineage %in% available_lineages) {
    distances <- utils::adist(lineage, available_lineages)[1, ]
    closest <- available_lineages[which.min(distances)]
    avail_str <- paste(utils::head(sort(available_lineages), 10), collapse = ", ")
    cli::cli_warn(c(
      "Lineage {.val {lineage}} not found in data.",
      "i" = "Available lineages: {avail_str}",
      "i" = "Did you mean {.val {closest}}?"
    ))
  }

  dat <- design$data
  resolved <- .resolve_time_column(dat, time, design$col_date_collected)
  dat <- resolved$data
  time_col <- resolved$col
  lin_col <- design$col_lineage

  z_alpha <- stats::qnorm(1 - (1 - conf_level) / 2)
  time_vals <- sort(unique(dat[[time_col]]))

  estimates <- purrr::map_dfr(time_vals, function(t) {
    d <- dat[dat[[time_col]] == t, , drop = FALSE]
    n <- nrow(d)
    n_t <- sum(d[[lin_col]] == lineage)
    p <- n_t / max(n, 1L)
    se_val <- sqrt(p * (1 - p) / max(n, 1L))
    tibble::tibble(
      time = t, lineage = lineage, n_obs = n, n_target = n_t,
      prevalence = p, se = se_val,
      ci_lower = .wilson_ci(p, n, z_alpha, "lower"),
      ci_upper = .wilson_ci(p, n, z_alpha, "upper"),
      effective_n = as.double(n), flag = "ok"
    )
  })

  new_surv_prevalence(estimates, design, "naive", lineage, conf_level, time)
}


# ---- Internal estimators ----

#' @keywords internal
.ht_estimator <- function(d, strata_vars) {
  w <- d$weight
  y <- d$.is_target
  estimate <- sum(w * y) / sum(w)
  se <- .stratified_var_ht(d, strata_vars, y, w, estimate)
  eff_n <- .kish_eff_n(w)
  list(estimate = estimate, se = se, effective_n = eff_n)
}

#' @keywords internal
.hajek_estimator <- function(d, strata_vars) {
  w <- d$weight
  y <- d$.is_target
  denom <- sum(w)
  estimate <- sum(w * y) / denom
  residuals <- w * (y - estimate) / denom
  se <- .stratified_var_lin(d, strata_vars, residuals)
  eff_n <- .kish_eff_n(w)
  list(estimate = estimate, se = se, effective_n = eff_n)
}

#' @keywords internal
.ps_estimator <- function(d, strata_vars, strata_info) {
  stratum_prev <- d |>
    dplyr::group_by(dplyr::across(dplyr::all_of(strata_vars))) |>
    dplyr::summarise(
      n_h = dplyr::n(), y_h = sum(.data$.is_target),
      p_h = .data$y_h / .data$n_h, .groups = "drop"
    )

  pop_col <- if ("pop_total" %in% names(strata_info)) "pop_total"
             else if ("n_positive" %in% names(strata_info)) "n_positive"
             else NULL

  if (!is.null(pop_col)) {
    stratum_prev <- dplyr::left_join(
      stratum_prev,
      strata_info[c(strata_vars, pop_col)],
      by = strata_vars
    )
    total_pop <- sum(strata_info[[pop_col]], na.rm = TRUE)
    stratum_prev$pi_h <- stratum_prev[[pop_col]] / total_pop
  } else {
    stratum_prev$pi_h <- 1 / nrow(stratum_prev)
  }

  estimate <- sum(stratum_prev$pi_h * stratum_prev$p_h)
  var_est <- sum(
    stratum_prev$pi_h^2 *
    stratum_prev$p_h * (1 - stratum_prev$p_h) /
    pmax(stratum_prev$n_h - 1, 1)
  )
  se <- sqrt(var_est)
  eff_n <- 1 / sum(stratum_prev$pi_h^2 / stratum_prev$n_h)
  list(estimate = estimate, se = se, effective_n = eff_n)
}


# ---- Variance helpers ----

#' @keywords internal
.stratified_var_ht <- function(d, strata_vars, y, w, estimate) {
  d$.y <- y
  d$.w <- w
  vc <- d |>
    dplyr::group_by(dplyr::across(dplyr::all_of(strata_vars))) |>
    dplyr::summarise(
      n_h = dplyr::n(),
      f_h = 1 / mean(.data$.w),
      s2_h = if (dplyr::n() > 1L) stats::var(.data$.w * .data$.y) else 0,
      .groups = "drop"
    ) |>
    dplyr::mutate(v_h = .data$n_h * (1 - .data$f_h) / pmax(.data$n_h - 1, 1) * .data$s2_h)
  sqrt(sum(vc$v_h)) / sum(w)
}

#' @keywords internal
.stratified_var_lin <- function(d, strata_vars, residuals) {
  d$.resid <- residuals
  ve <- d |>
    dplyr::group_by(dplyr::across(dplyr::all_of(strata_vars))) |>
    dplyr::summarise(
      n_h = dplyr::n(),
      v_h = .data$n_h / pmax(.data$n_h - 1, 1) *
            sum((.data$.resid - mean(.data$.resid))^2),
      .groups = "drop"
    )
  sqrt(sum(ve$v_h))
}


# ---- Wilson confidence interval ----

#' Wilson score interval for a proportion
#'
#' Provides valid coverage even at p-hat = 0 or 1, unlike Wald.
#' Reference: Wilson (1927) JASA; Agresti & Coull (1998) TAS.
#'
#' @param p Estimated proportion.
#' @param n Effective sample size.
#' @param z Critical value (e.g., qnorm(0.975) for 95% CI).
#' @param side Character: "lower" or "upper".
#' @return Scalar bound, clamped to 0--1.
#' @keywords internal
.wilson_ci <- function(p, n, z, side = c("lower", "upper")) {
  side <- match.arg(side)
  if (is.na(p) || is.na(n) || n <= 0) return(NA_real_)
  denom <- 1 + z^2 / n
  center <- (p + z^2 / (2 * n)) / denom
  margin <- z * sqrt(p * (1 - p) / n + z^2 / (4 * n^2)) / denom
  if (side == "lower") {
    return(max(0, center - margin))
  } else {
    return(min(1, center + margin))
  }
}

Try the survinger package in your browser

Any scripts or data that you put into this service are public.

survinger documentation built on April 27, 2026, 9:10 a.m.