inst/scripts/workflow_update.R

#!/usr/bin/env Rscript

# Workflow Orchestration Script for Automated NHANES Data Updates
# This script is designed to run in GitHub Actions but can also be run locally
#
# Purpose:
#   1. Pull fresh data from CDC servers for all configured datasets
#   2. Detect which datasets have changed using MD5 checksums
#   3. Upload only changed datasets to Cloudflare R2 bucket
#   4. Update checksums file for version tracking
#   5. Generate summary report of updates
#
# Environment Variables Required (for R2 upload):
#   - R2_ACCOUNT_ID
#   - R2_ACCESS_KEY_ID
#   - R2_SECRET_ACCESS_KEY
#
# Usage:
#   Rscript inst/scripts/workflow_update.R [--dry-run] [--datasets demo,bpx,...]
#
# Options:
#   --dry-run     Skip R2 upload (testing mode)
#   --datasets    Comma-separated list of specific datasets (default: all from config)

# Source internal data processing functions
# These are not part of the user-facing package API and live in inst/scripts/
source("inst/scripts/pull_nhanes.R")  # Provides pull_nhanes() and helpers

# Parse command line arguments
args <- commandArgs(trailingOnly = TRUE)
dry_run <- "--dry-run" %in% args
specific_datasets <- NULL

# Extract dataset list if provided
dataset_arg_idx <- which(grepl("^--datasets", args))
if (length(dataset_arg_idx) > 0) {
  dataset_arg <- args[dataset_arg_idx]
  if (grepl("=", dataset_arg)) {
    specific_datasets <- strsplit(
      sub("^--datasets=", "", dataset_arg),
      ","
    )[[1]]
  } else if (length(args) > dataset_arg_idx) {
    specific_datasets <- strsplit(args[dataset_arg_idx + 1], ",")[[1]]
  }
}

# Load the nhanesdata package
suppressPackageStartupMessages({
  library(nhanesdata)
  library(cli)
})

# Print startup banner
cli_h1("NHANES Data Workflow Update")
cli_alert_info("Mode: {if(dry_run) 'DRY RUN (no R2 upload)' else 'PRODUCTION'}")
cli_alert_info("Date: {Sys.time()}")
cli_rule()

# Initialize summary tracking
summary <- list(
  start_time = Sys.time(),
  datasets_processed = 0,
  datasets_changed = 0,
  datasets_unchanged = 0,
  datasets_failed = 0,
  datasets_uploaded = 0,
  datasets_skipped_cycles = 0,
  changed_datasets = character(0),
  failed_datasets = character(0),
  skipped_cycle_details = list()
)

# Load dataset configuration
cli_h2("Loading dataset configuration")
tryCatch(
  {
    # Load dataset config from YAML
    if (!requireNamespace("yaml", quietly = TRUE)) {
      stop(
        "Package 'yaml' is required. Install with: install.packages('yaml')",
        call. = FALSE
      )
    }
    config_file <- "inst/extdata/datasets.yml"
    if (!file.exists(config_file)) {
      stop(sprintf("Configuration file not found: %s", config_file))
    }
    config_raw <- yaml::read_yaml(config_file)
    config <- do.call(rbind, lapply(config_raw$datasets, function(x) {
      data.frame(
        name = x$name,
        description = x$description,
        category = x$category,
        notes = ifelse(is.null(x$notes), NA_character_, x$notes),
        stringsAsFactors = FALSE
      )
    }))

    cli_alert_success("Loaded {nrow(config)} datasets from configuration")
  },
  error = function(e) {
    cli_alert_danger("Failed to load dataset configuration: {e$message}")
    quit(status = 1)
  }
)

# Filter to specific datasets if requested
if (!is.null(specific_datasets)) {
  config <- config[config$name %in% specific_datasets, ]
  ds_list <- paste(specific_datasets, collapse = ", ")
  cli_alert_info(
    "Filtered to {nrow(config)} specific datasets: {ds_list}"
  )
}

# Validate R2 credentials (unless dry run)
if (!dry_run) {
  cli_h2("Validating R2 credentials")
  required_vars <- c("R2_ACCOUNT_ID", "R2_ACCESS_KEY_ID", "R2_SECRET_ACCESS_KEY")
  missing_vars <- required_vars[!sapply(required_vars, function(x) nzchar(Sys.getenv(x)))]

  if (length(missing_vars) > 0) {
    mv_list <- paste(missing_vars, collapse = ", ")
    cli_alert_danger(
      "Missing required environment variables: {mv_list}"
    )
    cli_alert_info("Set these in GitHub Secrets or your local environment")
    quit(status = 1)
  }
  cli_alert_success("All R2 credentials found")
}

# Process each dataset
cli_h2("Processing datasets")
cli_rule()

for (i in seq_len(nrow(config))) {
  dataset_name <- config$name[i]
  dataset_desc <- config$description[i]

  cli_h3("{i}/{nrow(config)}: {toupper(dataset_name)}")
  cli_alert_info("Description: {dataset_desc}")

  summary$datasets_processed <- summary$datasets_processed + 1

  # Step 1: Pull data from CDC
  cli_alert("Pulling data from CDC servers...")

  dataset_obj <- tryCatch(
    {
      pull_nhanes(dataset_name, save = TRUE)
    },
    error = function(e) {
      cli_alert_danger("Failed to pull {dataset_name}: {e$message}")
      summary$datasets_failed <- summary$datasets_failed + 1
      summary$failed_datasets <- c(summary$failed_datasets, dataset_name)
      NULL
    }
  )

  if (is.null(dataset_obj)) {
    cli_rule()
    next
  }

  cli_alert_success("Downloaded {scales::comma(nrow(dataset_obj))} rows")

  # Check for skipped cycles (transient CDC API failures)
  skipped <- attr(dataset_obj, "skipped_cycles")
  if (!is.null(skipped) && length(skipped) > 0) {
    summary$datasets_skipped_cycles <- summary$datasets_skipped_cycles + 1
    summary$skipped_cycle_details[[dataset_name]] <- skipped
    cli_alert_danger(paste0(
      "SKIPPING upload for {dataset_name}: ",
      "{length(skipped)} cycle(s) failed after retries: ",
      "{paste(skipped, collapse = ', ')}"
    ))
    cli_alert_warning(paste0(
      "Data may be incomplete. Will not overwrite R2 ",
      "with potentially missing cycles."
    ))
    cli_rule()
    next
  }

  # Step 2: Check if data has changed
  parquet_path <- sprintf("data/raw/parquet/%s.parquet", dataset_name)
  checksums_file <- ".checksums.json"

  cli_alert("Checking for changes...")
  # Detect data changes via checksum comparison
  tools_ok <- requireNamespace("tools", quietly = TRUE)
  json_ok <- requireNamespace("jsonlite", quietly = TRUE)
  if (!tools_ok || !json_ok) {
    stop(
      "Packages 'tools' and 'jsonlite' are required. ",
      "Install with: install.packages(c('tools', 'jsonlite'))",
      call. = FALSE
    )
  }

  has_changed <- if (!file.exists(parquet_path)) {
    warning(sprintf("File not found: %s", parquet_path))
    FALSE
  } else {
    new_hash <- tools::md5sum(parquet_path)
    names(new_hash) <- NULL

    checksums <- if (file.exists(checksums_file)) {
      jsonlite::read_json(checksums_file, simplifyVector = TRUE)
    } else {
      list()
    }

    stored_hash <- checksums[[dataset_name]]

    if (is.null(stored_hash)) {
      message(sprintf("%s: NEW dataset (no previous checksum)", dataset_name))
      TRUE
    } else if (new_hash != stored_hash) {
      message(sprintf("%s: CHANGED (hash mismatch)", dataset_name))
      TRUE
    } else {
      message(sprintf("%s: UNCHANGED (hash match)", dataset_name))
      FALSE
    }
  }

  if (has_changed) {
    summary$datasets_changed <- summary$datasets_changed + 1
    summary$changed_datasets <- c(summary$changed_datasets, dataset_name)

    # Step 3: Upload to R2 (if not dry run)
    if (!dry_run) {
      cli_alert("Uploading to R2 bucket...")

      upload_success <- tryCatch(
        {
          nhanesdata:::nhanes_r2_upload(
            x = dataset_obj,
            name = dataset_name,
            bucket = "nhanes-data"
          )
          TRUE
        },
        error = function(e) {
          cli_alert_danger("R2 upload failed: {conditionMessage(e)}")
          summary$datasets_failed <- summary$datasets_failed + 1
          summary$failed_datasets <- c(summary$failed_datasets, dataset_name)
          FALSE
        }
      )

      if (upload_success) {
        summary$datasets_uploaded <- summary$datasets_uploaded + 1
        cli_alert_success("Uploaded to R2")

        # Step 4: Update checksum for this dataset
        if (!file.exists(parquet_path)) {
          stop(sprintf("File not found: %s", parquet_path))
        }

        new_hash <- tools::md5sum(parquet_path)
        names(new_hash) <- NULL

        checksums <- if (file.exists(checksums_file)) {
          jsonlite::read_json(checksums_file, simplifyVector = TRUE)
        } else {
          list()
        }

        checksums[[dataset_name]] <- new_hash
        checksums <- checksums[sort(names(checksums))]

        jsonlite::write_json(
          checksums,
          checksums_file,
          pretty = TRUE,
          auto_unbox = TRUE
        )

        message(sprintf("Updated checksum for %s", dataset_name))
      }
    } else {
      cli_alert_warning("SKIPPED upload (dry run mode)")
    }
  } else {
    summary$datasets_unchanged <- summary$datasets_unchanged + 1
    cli_alert_info("No changes detected - skipping upload")
  }

  cli_rule()
}

# Generate summary report
end_time <- Sys.time()
duration_mins <- difftime(end_time, summary$start_time, units = "mins")

# Convert to JSON-serializable types
summary$start_time <- format(summary$start_time, "%Y-%m-%d %H:%M:%S")
summary$end_time <- format(end_time, "%Y-%m-%d %H:%M:%S")
# Explicitly strip difftime class by unlassing first, then converting to numeric
# This ensures all S3 class attributes are removed before JSON serialization
summary$duration_minutes <- as.numeric(unclass(duration_mins))

cli_h1("Workflow Summary")
cli_alert_info("Duration: {round(duration_mins, 2)} minutes")
cli_alert_info("Total datasets processed: {summary$datasets_processed}")
cli_alert_success("Changed datasets: {summary$datasets_changed}")
cli_alert_info("Unchanged datasets: {summary$datasets_unchanged}")

if (!dry_run) {
  cli_alert_success("Uploaded to R2: {summary$datasets_uploaded}")
}

if (summary$datasets_skipped_cycles > 0) {
  cli_alert_warning(
    "Skipped upload (missing cycles): {summary$datasets_skipped_cycles}"
  )
  for (ds in names(summary$skipped_cycle_details)) {
    cycles <- summary$skipped_cycle_details[[ds]]
    cli_alert_warning(
      "  {ds}: missing {paste(cycles, collapse = ', ')}"
    )
  }
}

if (summary$datasets_failed > 0) {
  cli_alert_danger("Failed datasets: {summary$datasets_failed}")
  cli_alert_warning(
    "Failed: {paste(summary$failed_datasets, collapse = ', ')}"
  )
}

# Print changed datasets
if (length(summary$changed_datasets) > 0) {
  cli_h2("Changed Datasets")
  for (ds in summary$changed_datasets) {
    cli_li(ds)
  }
}

# Save summary as JSON for GitHub Actions artifact
summary_json <- jsonlite::toJSON(summary, pretty = TRUE, auto_unbox = FALSE)
writeLines(summary_json, "workflow_summary.json")
cli_alert_success("Summary saved to workflow_summary.json")

# Exit with appropriate status
if (summary$datasets_failed > 0) {
  cli_alert_warning("Workflow completed with failures")
  quit(status = 1)
} else if (summary$datasets_skipped_cycles > 0) {
  cli_alert_warning(paste0(
    "Workflow completed but {summary$datasets_skipped_cycles} ",
    "dataset(s) skipped due to missing CDC cycles"
  ))
  quit(status = 1)
} else {
  cli_alert_success("Workflow completed successfully")
  quit(status = 0)
}

Try the nhanesdata package in your browser

Any scripts or data that you put into this service are public.

nhanesdata documentation built on March 1, 2026, 1:06 a.m.