R/TABLE_FUNCTIONS.R

Defines functions process_batch build_tables parse_npo get_fxdf get_fx_names get_table_names

#' Get RDB Table Names from the Concordance File
#'
#' Retrieves the list of unique table names defined in the Concordance rdb_table field.
#'
#' @param exclude A vector of substrings to exclude from table names.
#'
#' @return A vector of table names.
#' @examples
#' table_names <- get_table_names()
#' @export
get_table_names <- function( exclude = c("T99") ) {
  concordance <- get_concordance() 
  table.names <- concordance[["rdb_table"]] |> unique()
  table.names <- table.names[ table.names != "" ]
  if (!is.null(exclude)) {
    exclude <- paste0("-", exclude, "-", collapse = "|")
    table.names <- table.names[!grepl(exclude, table.names)]
  }
  return(table.names)
}


#' Get Function Names
#'
#' Retrieves a list of function names based on table names and exclusion criteria.
#'
#' @param table.names A vector of table names.
#'
#' @details Defaults to all tables in the concordance minus T99 table
#'     (supplementary information sections). The BUILD_SCHEDULE_TABLE()
#'     function is not defined in the concordance, but is added to the list.
#' 
#' @return A vector of function names.
#' @examples
#' table_names <- get_table_names( exclude="T99" )
#' fx_names <- get_fx_names( table_names )
#' @export
get_fx_names <- function( table.names=NULL ) {
  if( is.null(table.names) ) 
  { table.names <- get_table_names() }
  fx.names <- gsub("-", "_", table.names)
  fx.names <- paste0("BUILD_", fx.names)
  fx.names <- c(fx.names, "BUILD_SCHEDULE_TABLE")
  return(fx.names)
}

#' Get the data frame generated by a table build function. 
#'
#' Helper function that extracts a single table (the table associated with the function name passed as an argument) from a list of multiple tables from multiple nonprofits. It is used by build_tables() to write a batch of the data to file. 
#'
#' @param fx.name A function name from get_fx_names().
#' @param all.npos A list of parsed NPO data.
#' @param time A string representing the timestamp for the file.
#' @param year An integer specifying the tax year.
#'
#' @return None. Writes data to a CSV file.
#' @examples
#' # extract all tables for ten 990 filers
#' fx.names <- get_fx_names()
#' timestamp <- format(Sys.time(), "%b-%d-%Y-%Hh-%Mm")
#' # sample of 10 orgs in 2020
#' i2 <- dplyr::filter( 
#'       tinyindex, 
#'       TaxYear == 2020,
#'       FormType %in% c("990","990EZ") )
#' urls <- i2$URL[1:10]  
#' # pool data for the given table from npos in the sample
#' all.npos <- purrr::map( urls, parse_npo, fx.names )
#' get_fxdf( "BUILD_F9_P01_T00_SUMMARY", all.npos, timestamp, 2020 )
#' # CREATES FILE: "2020-F9-P01-T00-SUMMARY-Jan-22-2025-15h-13m.csv"
#' @export
get_fxdf <- function(fx.name, all.npos, time, year) {
  t.name <- substr(fx.name, start = 7, stop = nchar(fx.name))
  t.name <- gsub("_", "-", t.name)
  df.list <- lapply(all.npos, '[[', fx.name)
  df <- dplyr::bind_rows(df.list)
  if( nrow(df) > 0 )
  { data.table::fwrite(df, file = paste0(year, "-", t.name, "-", time, ".csv")) }
  return(invisible(df))
}


#' Parse NPO Data
#'
#' Parses XML data from a given URL and applies specified processing functions.
#'
#' @param url A string containing the URL of the XML file.
#' @param fx.names A list of functions to apply to the XML document.
#'
#' @return A named list with parsed data or a failure indicator if the URL is inaccessible.
#' @examples
#' npo_data <- parse_npo(url, fx.names)
#' @export
parse_npo <- function( url, year, fx.names, logXP=TRUE ) {

  doc <- NULL 
  
  try( doc <- xml2::read_xml(url) )

  if (is.null(doc)) {
    log_failed_url( url )
    return(list(FAIL = url))
  }

  xml2::xml_ns_strip(doc)
  TABLE.HEADERS <- get_table_headers() 

  one.npo <- sapply( fx.names, do.call, list(doc, url) )

  if (logXP) {
    dir.create( as.character(year), showWarnings=FALSE )
    setwd(as.character(year))
    log_missing_xpaths( doc, url )
    setwd("..")
  }

  # Cleanup
  rm(doc)
  gc()

  return(one.npo)
}


#' Build Tables
#'
#' Extracts and writes table data from URLs.
#'
#' @param urls A vector of URLs to process.
#' @param year The tax year associated with the data.
#' @param fx.names A vector of function names for processing tables.
#' @param table.names Optional vector of table names. Defaults to NULL.
#'
#' @return A vector of failed URLs.
#' @examples
#' failed_urls <- build_tables(urls, year = 2023)
#' @export
build_tables <- function(urls, year, fx.names = NULL, table.names = NULL) {

  if (is.null(table.names)) {
    table.names <- get_table_names( exclude = "T99" )
  }
  
  if (is.null(fx.names)) {
    fx.names <- get_fx_names( table.names )
  }
  
  all.npos <- furrr::future_map(urls, parse_npo, year=year, fx.names=fx.names, .progress = FALSE)
  time <- format(Sys.time(), "%b-%d-%Y-%Hh-%Mm")
  rand <- paste(sample(LETTERS, 5), collapse = "")
  time <- paste0("time-", time, "-", rand)
  
  fx.names <- c(fx.names, "BUILD_SCHEDULE_TABLE")
  
  purrr::walk(fx.names, get_fxdf, all.npos, time, year)
  
  # FIND ALL FAILED URLS
  failed.urls <- lapply(all.npos, '[[', "FAIL") |> unlist()  
  return(failed.urls)
}






#' Process and Build Batches in Parallel
#'
#' Processes batch IDs using `furrr::future_map()`
#'
#' @param batch.ids A list of batch IDs to process.
#' @param batch.list The complete batch list containing URLs.
#' @param fx.names A vector of function names for processing tables.
#' @param year The tax year associated with the data.
#'
#' @return A vector of completed batch IDs.
#' @export
process_batch <- function( batch, year, fx.names=NULL ) {

  if (is.null(fx.names)) {
    fx.names <- get_fx_names()
  }
  
  completed.batches <- purrr::imap_chr( batch, ~ {
    tryCatch(
      {
        # cat( paste0( "GID: ", .y, "\n" ) )
        failed.urls <- build_tables( .x, year = year  )
        return(.y)  # Return group.id if successful
      },
      error = function(e) {
        message(sprintf("Error in batch.id: %s, year: %s", .y, year))
        log_file <- "../ERROR-LOG.txt"
        error_msg <- sprintf(
          "[%s] Error in batch.id: %s, year: %s - %s\n",
          format(Sys.time(), "%Y-%m-%d %H:%M:%S"),
          .y, year, e$message
        )
        cat(error_msg, file = log_file, append = TRUE)
        return(NULL)
      }
    )
  }, .progress = FALSE)  # |> purrr::compact()

  if (length(completed.batches) > 0) {
    remove_groups( completed.batches )
    build_log <- "../BUILD-LOG.txt"
    batch.seq <- paste0(completed.batches, collapse = " ")
    timestamp <- format(Sys.time(), "%I:%M %p -- %b %d %Y")
    msg <- paste0("  >> ", timestamp, " -- COMPLETED ", batch.seq, "\n")
    # cat(msg, file = build_log, append = TRUE)
    cat(msg)
    flush.console()
  }
  
  return(completed.batches)
}

 
Nonprofit-Open-Data-Collective/irs990efile documentation built on June 11, 2025, 11:06 a.m.