R/FULL_DATABASE.R

Defines functions bind_data resume_build_database build_database build_one_year prep_index remove_groups remove_batch get_batch_names get_batch_counts get_batchfile create_batchfiles split_into_batches split_into_groups split_index

###---------------------------------------------------
###   BUILD FULL DATABASE
###---------------------------------------------------


#' Split Index into Groups
#'
#' Divides a vector of URLs into smaller groups of a specified size.
#'
#' @param index A data frame containing a column `URL` with URLs to be split.
#' @param group.size An integer specifying the maximum size of each group.
#'
#' @return A list of URL groups.
#' @examples
#' groups <- split_index(index, group.size = 1000)
#' @export
split_index <- function( year, index, group.size = 25) {
  index <- prep_index( years=year, index )
  urls <- index[["URL"]]
  batchfile <- split_into_groups( urls, G=group.size )
  # batchfile[["COMPLETE"]] <- character()
  dir.create( as.character(year), showWarnings=F )
  saveRDS( batchfile, paste0(year,"/BATCHFILE.RDS") )
  return(invisible(batchfile))
}


#' Split Vector into Groups
#'
#' Divides a vector into smaller groups of a specified size.
#'
#' @param x A vector of items to be split.
#' @param G An integer specifying the maximum size of each group (default: 25).
#'
#' @return A named list where each element is a vector of length up to `G`.
#' @export
split_into_groups <- function( x, G = 25) {

  # Split x into groups of size G
  groups <- split(x, ceiling(seq_along(x) / G))
  
  # Name groups
  n_groups <- length(groups)
  max_width  <- max(nchar(n_groups))
  num_groups <- stringr::str_pad( 1:n_groups, width=max_width, side="left", pad="0" ) 
  len_groups <- sapply( groups, length )
  names(groups) <- paste0( "G", num_groups, "{", len_groups, "}" )

  return(groups)
}


#' Split Groups into Batches
#'
#' Organizes a list of groups into batches for parallel processing.
#'
#' @param glist A list of grouped items, typically from `split_into_groups()`.
#' @param numcores An integer specifying the number of batches (e.g., number of CPU cores for parallel processing).
#'
#' @return A named list of batches, each containing a subset of groups.
#' @export
split_into_batches <- function( glist, numcores ) {

  # Split groups into batches of size Y
  batch_list <- split( glist, ceiling(seq_along(glist) / numcores))
  
  # Rename batches as "BATCH{N}"
  n_batches <- length(batch_list)
  max_width  <- max(nchar(n_batches))
  batch_num <- stringr::str_pad( 1:n_batches, width=max_width, side="left", pad="0" ) 
  names(batch_list) <- paste0( "BATCH", batch_num )
  
  return(batch_list)
}



#' Apply split_index() to All Years
#'
#' Create YEAR folders and one BATCHFILE list in each.  
#'
#' @param index An index data frame containing columns `URL` and `TaxYear`.
#' @param years A vector of years that will be included in the build.
#' @param group.size An integer specifying the size of each batch used in parallel processing.
#'
#' @return NULL.
#' @examples
#' create_batchfiles( tinyindex, years=2020:2022, group.size=100 )
#' @export
create_batchfiles <- function( index, years, group.size ){
  years <- as.character(years)
  purrr::walk( years, split_index, index=index, group.size=group.size )
}

#' @title Retrieve a Batchfile for a Given Year
#' @description Load a batchfile (URLs split into groups) generated from create_batchfiles(). 
#' @param year Specifies which batchfile to retrieve.
#' @return batchfile data frame.
#' @examples
#' create_batchfiles( tinyindex, years=2020, group.size=100 )
#' bf <- get_batchfile( 2020 )
#' names( bf )  # group IDs
#' @export
get_batchfile <- function( year ){
  if( ! file.exists(paste0(year,"/BATCHFILE.RDS")) ){  
    print( "NO BATCHFILE EXISTS" )
    return(NULL) }
  bf <- readRDS(paste0(year,"/BATCHFILE.RDS"))
  return(bf)
}


#' Parse Batch IDs and Return Batch Counts
#'
#' Parse the batch IDs (e.g. g1{100},g2{70}) and return  
#'   only the batch counts (100,70).  
#'
#' @param batch.ids Vector created by get_batch_ids()
#'
#' @return A numeric vector of batch counts.  
#' @examples
#' batch.ids <- c("g1{100}","g2{100}","g3{50}")
#' get_batch_counts( batch.ids )
#' @export
get_batch_counts <- function( batch.ids ){
  x <- stringr::str_match( batch.ids, ".*\\{([0-9]+)\\}")[, 2]
  n <- as.numeric(x)
  return(n)
}

#' Parse Batch IDs and Return Batch Names
#'
#' Parse the batch IDs (e.g. g1{100},g2{70}) and return  
#'   only the batch names ("g1","g2"). The batch names
#'   are used to retrieve URLs from the BATCHFILE and
#'   to delete the group from BATCHFILE after processing. 
#'
#' @param batch.ids Vector created by get_batch_ids()
#'
#' @return A character vector of batch names.   
#' @examples
#' batch.ids <- c("g1{100}","g2{100}","g3{50}")
#' get_batch_names( batch.ids )
#' @export
get_batch_names <- function( batch.ids ){
  x <- stringr::str_match( batch.ids, "(G[0-9]+)\\{[0-9]+\\}")[, 2]
  return(x)
}



#' Remove Batch of URLs from Queue
#'
#' Extracts and writes table data from URLs.
#'
#' @param x Batch name ("g1","g2",...) created from split_index()
#'
#' @return NULL
#' @examples
#' groups <- split_index( tinyindex )
#' L <- readRDS("BATCHFILE.RDS")
#' names(L)
#' remove_batch( "g3" )
#' L <- readRDS("BATCHFILE.RDS")
#' names(L)
#' @export
remove_batch <- function(x){
  L <- readRDS("BATCHFILE.RDS")
  L[[x]] <- NULL
  saveRDS( L, "BATCHFILE.RDS" )
}

#' Remove Groups of URLs from Queue
#'
#' Extracts completed batches from BATCHFILE.
#'
#' @param x Batch name ("g1","g2",...) created from split_index()
#'
#' @return NULL
#' @examples
#' groups <- split_index( tinyindex, group.size=25 )
#' L <- readRDS("BATCHFILE.RDS")
#' names(L)
#' remove_groups( c("G2{25}","G3{25}") )
#' L <- readRDS("BATCHFILE.RDS")
#' names(L)
#' @export
remove_groups <- function(groups){
  L <- readRDS("BATCHFILE.RDS")
  L[groups] <- NULL
  saveRDS( L, "BATCHFILE.RDS" )
}



#' Apply Specified Filters to the Build Index 
#'
#' @param index A Data Commons index from build_index(), get_current_index_full(), or get_current_index_batch().
#' @param year The desired years of the database to create. 
#'
#' @return A filtered index file. 
#' @examples
#' index <- get_current_index_batch()
#' index <- prep_index( index, years=2023 )
#' @export
prep_index <- function( years=NULL, index=NULL, form.type=c("990", "990EZ") ){
    if (is.null(index)) { index <- build_index() }
    if (is.null(years)) { years <- index[["TaxYear"]] |> unique() }
    years <- years |> as.character() |> sort()
    index <- dplyr::filter(index, TaxYear %in% years)
    index <- dplyr::filter(index, FormType %in% form.type )
    index <- index %>% dplyr::distinct( URL, .keep_all = TRUE )
    return(index)
}    


#' Database Build Steps for a Year of Data
#'
#' Using the BATCHFILE created by create_batchfiles(), 
#'   load the list of batched URLs, then send each
#'   batch to a CORE for XML parsing using parallel 
#'   processing functions. 
#' 
#' @param year The year of data you would like processed.
#'
#' @return The build_tables() function serves as the workhorse
#'   when parsing XML files into rectangular RDB tables. The
#'   function saves batched CSV files in the YEAR subfolder. 
#' 
#' @details The function should be run from the main project 
#'   folder that contains the YEAR subfolders created by 
#'   create_batchfiles(). A YEARX folder containing a BATCHFILE
#'   should exist when calling build_one_year( year=YEARX ).
#'
#' @examples
#' create_batchfiles( tinyindex, years=2020:2022, group.size=100 )
#' build_one_year( year=2021, index=tinyindex )
#' @export
build_one_year <- function(year, num.cores) {

  setwd(as.character(year))
  on.exit(setwd(".."))  # return to main folder on exit

  start.count <- 0
  if( file.exists("../FAILED-URLS.txt") ){start.count <- R.utils::countLines("../FAILED-URLS.txt")}  
  
  batchfile  <- get_batchfile(year = ".")
  batch.list <- split_into_batches( batchfile, numcores=num.cores )
  batch.ids  <- names( batch.list )
  n.urls <- batchfile |> unlist() |> length()
  # batchfile[["COMPLETE"]] <- NULL
  
  start.time <- Sys.time()
  cat(paste0("STARTING YEAR ", year, "\n"))
  cat(paste0("There are ", n.urls, " returns in ", year, ".\n"))
  if (n.urls < 1) { return(NULL) }
  cat(paste0("There are ", length(batchfile), " groups being sent for parallel collection.\n\n"))
  
  completed.batches <- purrr::map( batch.list, ~ process_batch(.x, year), .progress = FALSE)
  completed.batches <- unlist(completed.batches)
  
  batchfile  <- get_batchfile(year = ".")
  remaining.groups <- setdiff( names(batchfile), completed.batches )
  # cat(paste0("\nThere are ", length(remaining.groups), " groups left to process\n"))
  
  end.time <- Sys.time()
  total.mins <- round(difftime(end.time, start.time, units = "mins"), 2)
  
  failed.urls <- 0
  if( file.exists("../FAILED-URLS.txt") ){failed.urls <- readLines("../FAILED-URLS.txt")}
  cat(paste0("\nThere were ", length(failed.urls) - start.count, " failed URLS\n"))
  cat(paste0("Time for the ", year, " loop: ", total.mins, " minutes\n\n"))
  cat(paste0("\n###########################\n"))
  cat(paste0("###########################\n\n\n"))
}


#' Build a Complete Database
#'
#' This function builds a database of IRS 990 e-filers available on AWS.
#'
#' @param index An optional index file created from `build_index()`. If not provided, it will be generated automatically.
#' @param years A vector of tax years to include in the build. If NULL, all available years are included.
#' @param batch.size The number of files to send to each core in parallel processing. 
#' @param spare.cores The number of cores in reserve while deploying parallel processing.
#'
#' @return Saves log files, database tables, and build information to the local environment. Returns NULL.
#' @details The function filters forms by type (990, 990EZ) and processes the specified tax years. Logs and session information are saved for reproducibility.
#' @examples
#' \dontrun{
#' index <- build_index()
#' build_database(index, years = 2015:2017)
#' }
#' @export
build_database <- function(index=NULL, years=NULL, batch.size=25, spare.cores=1 ) {

    # Set parallel strategy at the highest level
    num.cores <- future::availableCores() - spare.cores
    future::plan(future::multisession, workers = num.cores )  
    
    index <- prep_index( years=years, index=index )
    
    if (is.null(years)) { 
        years <- index[["TaxYear"]] |> unique() |> as.character() |> sort() 
    }

    create_batchfiles( index, years, group.size=batch.size )
    dir.create("HIST", showWarnings = FALSE)

    start.build.time <- Sys.time()
    session.info <- sessionInfo()
    dump(list = "session.info", file = "HIST/SESSION-INFO.R")
    saveRDS(index, "HIST/build-index.rds")

    # Redirect standard output and messages
    zz <- file( "BUILD-LOG.txt", open = "at" )
    sink( zz, split = TRUE )                            # Redirect standard output
    sink( zz, type = "message", append = TRUE )         # Redirect messages

    on.exit({
      sink(type = "message")      # Restore message output to console
      sink()                      # Restore standard output
      close(zz)                   # Close the file connection
      file.show("BUILD-LOG.txt")  # View the logs
    })
    
    cat(paste0("\nDATABASE BUILD START TIME: ", Sys.time(),"\n\n"))
    cat(paste0("You have ", future::availableCores(), " cores available for parallel processing.\n"))
    cat(paste0("You have allocated ", num.cores, " cores for collection.\n"))
    cat(paste0("Years: ", paste0(years, collapse = ";"),"\n"))
    cat(paste0("There are ", nrow(index), " returns in this build.\n\n"))
    cat( table( index$TaxYear ) |> knitr::kable(), sep="\n" )
    cat(paste0("\n\n###########################\n"))
    cat(paste0("###########################\n\n\n"))

    purrr::walk( years, build_one_year, num.cores=num.cores, .progress = FALSE)  # Run in parallel

    cat(paste0("COMPILING FILES\n\n\n"))
    cat(paste0("###########################\n"))
    cat(paste0("###########################\n\n"))
    
    # combine split files into /COMPILED/ folder
    # aggregate xpath and odd case logs:
    # > MISSING-XPATHS.txt
    # > COLLAPSED-RECORDS.txt
    bind_data(years = years)

    end.build.time <- Sys.time()
    cat(paste0("DATABASE BUILD FINISH TIME: ", Sys.time(), " \n"))
    cat(paste0("TOTAL BUILD TIME: ", round(difftime(end.build.time, start.build.time, units = "hours"), 2), " HOURS\n\n"))

    # savehistory("HIST/build-history.Rhistory")
    
    return(NULL)
}


#' @title Resume build_database() if Interrupted.   
#' @description If you need to pause build_database() before 
#'   it is complete or your computer restarts you can resume
#'   the process from the stop point. 
#' @param index Use the same index object that was originally passed to build_database().
#'   If left NULL it loads the version that is saved when build_database() is first called (build-index.rds). 
#' @param years A vector of years remaining in the build. If NULL, all index years will be attempted. 
#' @param batch.size The number of files to send to each core in parallel processing. 
#' @param spare.cores The number of cores in reserve while deploying parallel processing. 
#' @export
resume_build_database <- function( years=NULL, index=NULL, batch.size=25, spare.cores=1  ) {

    if( is.null(index) ){ index <- readRDS( "HIST/build-index.rds" ) }
    
    if (is.null(years)) 
    { years <- index[["TaxYear"]] |> unique() |> as.character() |> sort() }

    index <- prep_index( years=years, index=index )

    #-------------

    num.cores <- future::availableCores() - spare.cores
    future::plan(future::multisession, workers = num.cores )  

    start.build.time <- Sys.time()
    session.info <- sessionInfo()
    dump(list = "session.info", file = "HIST/SESSION-INFO-RESUME.R")

    closeAllConnections()
  
    # Redirect standard output and messages
    zz <- file( "BUILD-LOG.txt", open = "at" )
    sink( zz, split = TRUE )                            # Redirect standard output
    sink( zz, type = "message", append = TRUE )         # Redirect messages

    on.exit({
      sink(type = "message")      # Restore message output to console
      sink()                      # Restore standard output
      close(zz)                   # Close the file connection
      file.show("BUILD-LOG.txt")  # View the logs
    })

    cat(paste0("\n#--------------------------------#\n"))
    cat(paste0("\n\nRESUMING DATABASE BUILD\n"))
    cat(paste0("###########################\n"))
    cat(paste0("###########################\n\n"))
    
    cat(paste0("NEW DATABASE BUILD START TIME: ", Sys.time(),"\n")) 
    cat(paste0("You have ", future::availableCores(), " cores available for parallel processing.\n"))
    cat(paste0("You have allocated ", num.cores, " cores for collection.\n"))
    cat(paste0("Years: ", paste0(years, collapse = ";"),"\n"))
    cat(paste0("There are ", nrow(index), " returns in this build.\n\n"))
    cat( table( index$TaxYear ) |> knitr::kable(), sep="\n" )
    cat(paste0("\n\n###########################\n"))
    cat(paste0("###########################\n\n\n"))

    purrr::walk( years, build_one_year, num.cores=num.cores, .progress = FALSE)  # Run in parallel

    cat(paste0("COMPILING FILES\n\n\n"))
    cat(paste0("###########################\n"))
    cat(paste0("###########################\n\n"))
    
    # combine split files into /COMPILED/ folder
    # aggregate xpath and odd case logs:
    # > MISSING-XPATHS.txt
    # > COLLAPSED-RECORDS.txt
    bind_data(years = years)

    end.build.time <- Sys.time()
    cat(paste0("DATABASE BUILD FINISH TIME: ", Sys.time(), " \n"))
    cat(paste0("TOTAL BUILD TIME: ", round(difftime(end.build.time, start.build.time, units = "hours"), 2), " HOURS\n\n"))
    
    return(NULL)
}




#' Combine Batched CSV Files
#'
#' Combines all raw table files generated during parallel processing into compiled data.
#'
#' @param years A vector of tax years to compile. These correspond to subdirectories containing raw table files.
#'
#' @return Saves compiled data tables as CSV and RDS files in the "COMPILED" directory.
#' @details The function loops through raw table files, consolidates them by table name, and ensures unique entries. Results are stored for each tax year.
#' @examples
#' \dontrun{
#' bind_data(years = 2015:2017)
#' }
#' @export
bind_data <- function(years)
{
    dir.create("COMPILED")

    years <- as.character(years)
    for (i in years) 
    {
        setwd(i)
        file.names <- dir()

        # Drop the dates from the end and combine years
        x <- substr(file.names, 6, nchar(file.names) - 30)
        table.names <- unique(x)
        table.names <- table.names[!table.names == ""]

        for (j in table.names) 
        {
            these <- grepl(j, file.names) & grepl("*.csv", file.names)
            loop.list <- file.names[these]

            dk.list <- list()
            for (k in loop.list) 
            {
               if( file.info(k)$size > 0 )
               { 
                 d.k <- data.table::fread( k, colClasses="character" ) 
                 dk.list[[ k ]] <- d.k  
               }
            }

            d <- dplyr::bind_rows( dk.list )
            if( nrow(d)==0 ){ next }
            d <- unique(d)

            # Drop the -time from table name
            j <- substr(j, 1, nchar(j) - 5)

            data.table::fwrite(d, paste0("../COMPILED/", j, "-", i, ".csv"), row.names = FALSE)

        } # End j loop

        setwd("..")

    } # End i loop

    # collapsed records are 
    # cases that should be a single
    # value but are not
    
    # missing xpaths are those in
    # the xml files but not the
    # concordance 
    
    dir.create("FIX")
    
    DFX <- list()  # missing xpaths
    DFR <- list()  # collapsed records
    DFE <- list()  # url errors
    
    for( k in years )
    {
        setwd( k )
        DFX[[k]] <- get_missing_xpath_df()

        dfr.k <- paste0( "COLLAPSED-RECORDS-", k, ".txt" )
        if( file.exists(dfr.k) ){
          DFR[[k]] <- readLines( dfr.k )
        }
        
        dfe.k <- "FAILED-URLS.txt"
        if( file.exists(dfe.k) ){
          DFE[[k]] <- readLines( dfe.k )
        }
        
        setwd("..")
    }
        
    df <- dplyr::bind_rows( DFX )
    data.table::fwrite( df, "FIX/MISSING-XPATHS-ALL.CSV" ) 
    tb <- df %>% count_xpaths()
    data.table::fwrite( tb, "FIX/MISSING-XPATHS-COUNT.CSV" )

    recs <- unlist( DFR )
    if( length(recs) > 0 )
    { 
      fileConnRecs <- file( "FIX/COLLAPSED-RECORDS-ALL-YEARS.txt" )
      writeLines( recs, fileConnRecs )
      close( fileConnRecs )
    }

    url.errors <- unlist( DFE )
    if( length(url.errors) > 0 )
    { 
      fileConnURLs <- file( "FAILED-URLS.txt"  )
      writeLines( url.errors, fileConnURLs )
      close( fileConnURLs )
    }
}
Nonprofit-Open-Data-Collective/irs990efile documentation built on June 11, 2025, 11:06 a.m.