#' Get RDB Table Names from the Concordance File
#'
#' Retrieves the list of unique table names defined in the Concordance rdb_table field.
#'
#' @param exclude A vector of substrings to exclude from table names.
#'
#' @return A vector of table names.
#' @examples
#' table_names <- get_table_names()
#' @export
get_table_names <- function( exclude = c("T99") ) {
concordance <- get_concordance()
table.names <- concordance[["rdb_table"]] |> unique()
table.names <- table.names[ table.names != "" ]
if (!is.null(exclude)) {
exclude <- paste0("-", exclude, "-", collapse = "|")
table.names <- table.names[!grepl(exclude, table.names)]
}
return(table.names)
}
#' Get Function Names
#'
#' Retrieves a list of function names based on table names and exclusion criteria.
#'
#' @param table.names A vector of table names.
#'
#' @details Defaults to all tables in the concordance minus T99 table
#' (supplementary information sections). The BUILD_SCHEDULE_TABLE()
#' function is not defined in the concordance, but is added to the list.
#'
#' @return A vector of function names.
#' @examples
#' table_names <- get_table_names( exclude="T99" )
#' fx_names <- get_fx_names( table_names )
#' @export
get_fx_names <- function( table.names=NULL ) {
if( is.null(table.names) )
{ table.names <- get_table_names() }
fx.names <- gsub("-", "_", table.names)
fx.names <- paste0("BUILD_", fx.names)
fx.names <- c(fx.names, "BUILD_SCHEDULE_TABLE")
return(fx.names)
}
#' Get the data frame generated by a table build function.
#'
#' Helper function that extracts a single table (the table associated with the function name passed as an argument) from a list of multiple tables from multiple nonprofits. It is used by build_tables() to write a batch of the data to file.
#'
#' @param fx.name A function name from get_fx_names().
#' @param all.npos A list of parsed NPO data.
#' @param time A string representing the timestamp for the file.
#' @param year An integer specifying the tax year.
#'
#' @return None. Writes data to a CSV file.
#' @examples
#' # extract all tables for ten 990 filers
#' fx.names <- get_fx_names()
#' timestamp <- format(Sys.time(), "%b-%d-%Y-%Hh-%Mm")
#' # sample of 10 orgs in 2020
#' i2 <- dplyr::filter(
#' tinyindex,
#' TaxYear == 2020,
#' FormType %in% c("990","990EZ") )
#' urls <- i2$URL[1:10]
#' # pool data for the given table from npos in the sample
#' all.npos <- purrr::map( urls, parse_npo, fx.names )
#' get_fxdf( "BUILD_F9_P01_T00_SUMMARY", all.npos, timestamp, 2020 )
#' # CREATES FILE: "2020-F9-P01-T00-SUMMARY-Jan-22-2025-15h-13m.csv"
#' @export
get_fxdf <- function(fx.name, all.npos, time, year) {
t.name <- substr(fx.name, start = 7, stop = nchar(fx.name))
t.name <- gsub("_", "-", t.name)
df.list <- lapply(all.npos, '[[', fx.name)
df <- dplyr::bind_rows(df.list)
if( nrow(df) > 0 )
{ data.table::fwrite(df, file = paste0(year, "-", t.name, "-", time, ".csv")) }
return(invisible(df))
}
#' Parse NPO Data
#'
#' Parses XML data from a given URL and applies specified processing functions.
#'
#' @param url A string containing the URL of the XML file.
#' @param fx.names A list of functions to apply to the XML document.
#'
#' @return A named list with parsed data or a failure indicator if the URL is inaccessible.
#' @examples
#' npo_data <- parse_npo(url, fx.names)
#' @export
parse_npo <- function( url, fx.names, logXP=TRUE ) {
doc <- NULL
try( doc <- xml2::read_xml(url) )
if (is.null(doc)) {
log_failed_url( url )
return(list(FAIL = url))
}
xml2::xml_ns_strip(doc)
TABLE.HEADERS <- get_table_headers()
one.npo <- sapply( fx.names, do.call, list(doc, url) )
if (logXP) {
log_missing_xpaths( doc, url )
}
# Cleanup
rm(doc)
gc()
return(one.npo)
}
#' Build Tables
#'
#' Extracts and writes table data from URLs.
#'
#' @param urls A vector of URLs to process.
#' @param year The tax year associated with the data.
#' @param fx.names A vector of function names for processing tables.
#' @param table.names Optional vector of table names. Defaults to NULL.
#'
#' @return A vector of failed URLs.
#' @examples
#' failed_urls <- build_tables(urls, year = 2023)
#' @export
build_tables <- function(urls, year, fx.names = NULL) {
# LOAD ALL FX NAMES IF NONE PROVIDED
if (is.null(fx.names)) { fx.names <- get_fx_names() }
# BUILD TABLES FOR ALL FUNCTIONS
all.npos <- purrr::map( urls, parse_npo, fx.names )
# SAVE BATCHED TABLES TO FILE
time <- format(Sys.time(), "%b-%d-%Y-%Hh-%Mm")
rand <- paste(sample(LETTERS, 5), collapse = "")
time <- paste0("time-", time, "-", rand)
purrr::walk(fx.names, get_fxdf, all.npos, time, year)
# FIND ALL FAILED URLS
failed.urls <- lapply(all.npos, '[[', "FAIL") |> unlist()
return(failed.urls)
}
#' @title Passing arguments to parSapply
#' @description Pass arguments to parallel sapply table function.
#' @details Helper function to send variables to the build_tables function in parSapply framework.
#' @export
parsapply_tables <- function( batch.id ){
require( irs990efile )
## 'fx.names', 'year', and 'batch.list'
## passed through clusterExport()
group.name <- get_batch_names( batch.id )
batch.urls <- batch.list[[ group.name ]]
failed.urls <- build_tables(batch.urls, fx.names = fx.names, year = year)
return( batch.id )
}
#' @title Parallel Build of Tables
#' @description Builds tables in parallel using multiple cores.
#' @param groups A list of URL groups to process.
#' @param year The tax year associated with the data.
#' @param fx.names A vector of function names for processing tables.
#' @return A vector of failed URLs.
#' @examples
#' failed_urls <- build_tables_parallel(groups, year = 2023)
#' @export
build_tables_parallel <- function( batch.list, year, fx.names = NULL) {
if (is.null(fx.names)) {fx.names <- get_fx_names()}
# configure clusters
num.cores <- parallel::detectCores() - 1
cl <- parallel::makeCluster(num.cores)
# Ensure cluster stops on exit, even if there's an error
on.exit( parallel::stopCluster(cl), add = TRUE )
parallel::clusterExport(cl, varlist = c( "year", "fx.names", "batch.list"), envir = environment())
# Force messages to print in real time
parallel::clusterEvalQ(cl, {
options(warn = 1) # Ensures warnings and messages are printed immediately
NULL
})
batch.ids <- get_batch_ids( batch.list )
# SPLIT BATCH IDs INTO LISTS
# WITH NUM OF ELEMENTS IN EACH LIST
# CORRESPONDING TO AVAILALBE CORES
f <- ((1:length(batch.ids)) + num.cores - 1) %/% num.cores
max.char <- max( nchar(f) )
f <- stringr::str_pad( f, width=max.char, side="left", pad="0" )
f <- paste0("TRANCH", f)
f.names <- unique(f)
f <- factor(f, levels = f.names )
batch.ids.list <- split(batch.ids, f)
completed.tasks <- lapply( batch.ids.list, send_batch, cl )
failed.urls <- character()
if( file.exists("FAILED-URLS.txt") ){
failed.urls <- readLines( "FAILED-URLS.txt" )
}
return(failed.urls)
}
#' @title Error handling and messaging for parsupply_tables
#' @description Pass a group of batch IDs to parallel sapply table function.
#' @details Logging and resource management for a subgroup of batches to prevent BATCHFILE read write conflicts.
#' @export
send_batch <- function(batch.ids, cl) {
# Run in parallel with enhanced error handling per batch
completed.batches <- parallel::parSapply(cl, X = batch.ids, FUN = function(batch.id) {
tryCatch(
{
# Ensure each batch.id is handled correctly
parsapply_tables(batch.id)
},
error = function(e) {
# Capture details about the error
message(sprintf("Error in batch.id: %s, year: %s", batch.id, year))
# Log error in a file
log_file <- "../ERROR-LOG.txt"
error_msg <-
sprintf( "[%s] Error in batch.id: %s, year: %s - %s\n",
format( Sys.time(), "%Y-%m-%d %H:%M:%S" ),
batch.id, year, e$message )
cat( error_msg, file = log_file, append = TRUE )
return(NULL) # Return NULL for this batch, but continue processing others
}
)
})
if( length( completed.batches > 0 ) )
{
# remove tasks from the list
purrr::walk( completed.batches, remove_batch )
# Redirect output to log file
build_log <- file("../BUILD-LOG.txt", open = "at" )
sink( build_log, append = TRUE, type = "message")
# Report progress
batch.seq <- paste0( completed.batches, collapse=" " )
timestamp <- format(Sys.time(), "%I:%M %p -- %b %d %Y")
timestamp <- paste0( " >> ", timestamp, " -- " )
msg <- paste0( timestamp, "COMPLETED ", batch.seq, "\n" )
cat( msg, sep="" )
flush.console()
sink(type = "message") # Restore standard output next
close(build_log) # Close file connection
}
return(completed.batches)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.