R/TABLE_FUNCTIONS.R

Defines functions send_batch build_tables_parallel parsapply_tables build_tables parse_npo get_fxdf get_fx_names get_table_names

#' Get RDB Table Names from the Concordance File
#'
#' Retrieves the list of unique table names defined in the Concordance rdb_table field.
#'
#' @param exclude A vector of substrings to exclude from table names.
#'
#' @return A vector of table names.
#' @examples
#' table_names <- get_table_names()
#' @export
get_table_names <- function( exclude = c("T99") ) {
  concordance <- get_concordance() 
  table.names <- concordance[["rdb_table"]] |> unique()
  table.names <- table.names[ table.names != "" ]
  if (!is.null(exclude)) {
    exclude <- paste0("-", exclude, "-", collapse = "|")
    table.names <- table.names[!grepl(exclude, table.names)]
  }
  return(table.names)
}


#' Get Function Names
#'
#' Retrieves a list of function names based on table names and exclusion criteria.
#'
#' @param table.names A vector of table names.
#'
#' @details Defaults to all tables in the concordance minus T99 table
#'     (supplementary information sections). The BUILD_SCHEDULE_TABLE()
#'     function is not defined in the concordance, but is added to the list.
#' 
#' @return A vector of function names.
#' @examples
#' table_names <- get_table_names( exclude="T99" )
#' fx_names <- get_fx_names( table_names )
#' @export
get_fx_names <- function( table.names=NULL ) {
  if( is.null(table.names) ) 
  { table.names <- get_table_names() }
  fx.names <- gsub("-", "_", table.names)
  fx.names <- paste0("BUILD_", fx.names)
  fx.names <- c(fx.names, "BUILD_SCHEDULE_TABLE")
  return(fx.names)
}

#' Get the data frame generated by a table build function. 
#'
#' Helper function that extracts a single table (the table associated with the function name passed as an argument) from a list of multiple tables from multiple nonprofits. It is used by build_tables() to write a batch of the data to file. 
#'
#' @param fx.name A function name from get_fx_names().
#' @param all.npos A list of parsed NPO data.
#' @param time A string representing the timestamp for the file.
#' @param year An integer specifying the tax year.
#'
#' @return None. Writes data to a CSV file.
#' @examples
#' # extract all tables for ten 990 filers
#' fx.names <- get_fx_names()
#' timestamp <- format(Sys.time(), "%b-%d-%Y-%Hh-%Mm")
#' # sample of 10 orgs in 2020
#' i2 <- dplyr::filter( 
#'       tinyindex, 
#'       TaxYear == 2020,
#'       FormType %in% c("990","990EZ") )
#' urls <- i2$URL[1:10]  
#' # pool data for the given table from npos in the sample
#' all.npos <- purrr::map( urls, parse_npo, fx.names )
#' get_fxdf( "BUILD_F9_P01_T00_SUMMARY", all.npos, timestamp, 2020 )
#' # CREATES FILE: "2020-F9-P01-T00-SUMMARY-Jan-22-2025-15h-13m.csv"
#' @export
get_fxdf <- function(fx.name, all.npos, time, year) {
  t.name <- substr(fx.name, start = 7, stop = nchar(fx.name))
  t.name <- gsub("_", "-", t.name)
  df.list <- lapply(all.npos, '[[', fx.name)
  df <- dplyr::bind_rows(df.list)
  if( nrow(df) > 0 )
  { data.table::fwrite(df, file = paste0(year, "-", t.name, "-", time, ".csv")) }
  return(invisible(df))
}


#' Parse NPO Data
#'
#' Parses XML data from a given URL and applies specified processing functions.
#'
#' @param url A string containing the URL of the XML file.
#' @param fx.names A list of functions to apply to the XML document.
#'
#' @return A named list with parsed data or a failure indicator if the URL is inaccessible.
#' @examples
#' npo_data <- parse_npo(url, fx.names)
#' @export
parse_npo <- function( url, fx.names, logXP=TRUE ) {

  doc <- NULL 
  
  try( doc <- xml2::read_xml(url) )

  if (is.null(doc)) {
    log_failed_url( url )
    return(list(FAIL = url))
  }

  xml2::xml_ns_strip(doc)
  TABLE.HEADERS <- get_table_headers() 

  one.npo <- sapply( fx.names, do.call, list(doc, url) )

  if (logXP) {
    log_missing_xpaths( doc, url )
  }

  # Cleanup
  rm(doc)
  gc()

  return(one.npo)
}


#' Build Tables
#'
#' Extracts and writes table data from URLs.
#'
#' @param urls A vector of URLs to process.
#' @param year The tax year associated with the data.
#' @param fx.names A vector of function names for processing tables.
#' @param table.names Optional vector of table names. Defaults to NULL.
#'
#' @return A vector of failed URLs.
#' @examples
#' failed_urls <- build_tables(urls, year = 2023)
#' @export
build_tables <- function(urls, year, fx.names = NULL) {

  # LOAD ALL FX NAMES IF NONE PROVIDED
  if (is.null(fx.names)) { fx.names <- get_fx_names() }
  
  # BUILD TABLES FOR ALL FUNCTIONS 
  all.npos <- purrr::map( urls, parse_npo, fx.names )
  
  # SAVE BATCHED TABLES TO FILE
  time <- format(Sys.time(), "%b-%d-%Y-%Hh-%Mm")
  rand <- paste(sample(LETTERS, 5), collapse = "")
  time <- paste0("time-", time, "-", rand)
  purrr::walk(fx.names, get_fxdf, all.npos, time, year)
  
  # FIND ALL FAILED URLS
  failed.urls <- lapply(all.npos, '[[', "FAIL") |> unlist()
  return(failed.urls)
}


#' @title Passing arguments to parSapply
#' @description Pass arguments to parallel sapply table function.  
#' @details Helper function to send variables to the build_tables function in parSapply framework. 
#' @export
parsapply_tables <- function( batch.id ){

  require( irs990efile ) 
  
  ##  'fx.names', 'year', and 'batch.list'
  ##  passed through clusterExport()
  
  group.name <- get_batch_names( batch.id )
  batch.urls <- batch.list[[ group.name ]] 
  
  failed.urls <- build_tables(batch.urls, fx.names = fx.names, year = year) 
  
  return( batch.id )
}

#' @title Parallel Build of Tables
#' @description Builds tables in parallel using multiple cores.
#' @param groups A list of URL groups to process.
#' @param year The tax year associated with the data.
#' @param fx.names A vector of function names for processing tables.
#' @return A vector of failed URLs.
#' @examples
#' failed_urls <- build_tables_parallel(groups, year = 2023)
#' @export
build_tables_parallel <- function( batch.list, year, fx.names = NULL) {
  
  if (is.null(fx.names)) {fx.names <- get_fx_names()}

  # configure clusters
  num.cores <- parallel::detectCores() - 1
  cl <- parallel::makeCluster(num.cores)
  
  # Ensure cluster stops on exit, even if there's an error
  on.exit( parallel::stopCluster(cl), add = TRUE )
  
  parallel::clusterExport(cl, varlist = c( "year", "fx.names", "batch.list"), envir = environment())
  
  # Force messages to print in real time
  parallel::clusterEvalQ(cl, {
    options(warn = 1)  # Ensures warnings and messages are printed immediately
    NULL
  })

  batch.ids <- get_batch_ids( batch.list )
  
  # SPLIT BATCH IDs INTO LISTS
  # WITH NUM OF ELEMENTS IN EACH LIST
  # CORRESPONDING TO AVAILALBE CORES
  f <- ((1:length(batch.ids)) + num.cores - 1) %/% num.cores 
  max.char <- max( nchar(f) )
  f <- stringr::str_pad( f, width=max.char, side="left", pad="0" )
  f <- paste0("TRANCH", f)
  f.names <- unique(f)
  f <- factor(f, levels = f.names )
  batch.ids.list <- split(batch.ids, f)
  
  completed.tasks <- lapply( batch.ids.list, send_batch, cl )
  
  failed.urls <- character()
 
  if( file.exists("FAILED-URLS.txt") ){
    failed.urls <- readLines( "FAILED-URLS.txt" ) 
  }
        
  return(failed.urls)
}


#' @title Error handling and messaging for parsupply_tables
#' @description Pass a group of batch IDs to parallel sapply table function.  
#' @details Logging and resource management for a subgroup of batches to prevent BATCHFILE read write conflicts. 
#' @export
send_batch <- function(batch.ids, cl) {

  # Run in parallel with enhanced error handling per batch
  completed.batches <- parallel::parSapply(cl, X = batch.ids, FUN = function(batch.id) {
    tryCatch(
      {
        # Ensure each batch.id is handled correctly
        parsapply_tables(batch.id)
      },
      error = function(e) {

        # Capture details about the error
        message(sprintf("Error in batch.id: %s, year: %s", batch.id, year))
              
        # Log error in a file
        log_file <- "../ERROR-LOG.txt"
      
        error_msg <- 
          sprintf( "[%s] Error in batch.id: %s, year: %s - %s\n",
                   format( Sys.time(), "%Y-%m-%d %H:%M:%S" ),
                   batch.id, year, e$message )
                           
        cat( error_msg, file = log_file, append = TRUE )
        
        return(NULL)  # Return NULL for this batch, but continue processing others
      }
    )
  })
  
  
  if( length( completed.batches > 0 ) )
  {
    # remove tasks from the list
    purrr::walk( completed.batches, remove_batch )

    # Redirect output to log file
    build_log <- file("../BUILD-LOG.txt", open = "at" )
    sink( build_log, append = TRUE, type = "message")
    
    # Report progress 
    batch.seq <- paste0( completed.batches, collapse=" "  )
    timestamp <- format(Sys.time(), "%I:%M %p -- %b %d %Y") 
    timestamp <- paste0( "  >> ", timestamp, " -- " )
    msg <- paste0( timestamp, "COMPLETED ", batch.seq, "\n" )
    cat( msg, sep="" )
    flush.console()
    sink(type = "message")   # Restore standard output next 
    close(build_log)         # Close file connection
  }
  
  return(completed.batches)
}


 
Nonprofit-Open-Data-Collective/irs990efile documentation built on Feb. 5, 2025, 3:07 a.m.