R/jbd_Ctrans_chunker.R

Defines functions jbd_Ctrans_chunker

Documented in jbd_Ctrans_chunker

# This function was written by James Dorey to chunk the bdc_coordinates_transposed function
# to allow bigger datasets to be analysed without consuming too much RAM.
# This function was written on the 31st of May 2022. For questions, please email jbdorey[at]me.com
#' Wraps jbd_coordinates_transposed to identify  and fix transposed occurrences
#' 
#' Because the [BeeBDC::jbd_coordinates_transposed()] function is very RAM-intensive, this wrapper 
#' allows a user to specify chunk-sizes and only analyse a small portion of the occurrence data at a 
#' time. The prefix jbd_ is used to highlight the difference between this function and the original
#' [bdc::bdc_coordinates_transposed()].
#' This function will preferably use the countryCode column generated by 
#' [bdc::bdc_country_standardized()].
#'
#' @param data A data frame or tibble. Occurrence records as input.
#' @param lat Character. The column with latitude in decimal degrees. Default = "decimalLatitude".
#' @param lon Character. The column with longitude in decimal degrees. Default = "decimalLongitude".
#' @param country Character. The name of the column containing country names. Default = "country".
#' @param idcol Character. The column name with a unique record identifier. Default = "database_id".
#' @param countryCode Character. Identifies the column containing ISO-2 country codes 
#' Default = "countryCode".
#' @param sci_names Character. The column containing scientific names. Default = "scientificName".
#' @param border_buffer Numeric. The buffer, in decimal degrees, around points to help match them
#' to countries. Default = 0.2 (~22 km at equator).
#' @param save_outputs Logical. If TRUE, transposed occurrences will be saved to their own file.
#' @param stepSize Numeric. The number of occurrences to process in each chunk. Default = 1000000.
#' @param chunkStart Numeric. The chunk number to start from. This can be > 1 when you need to restart 
#' the function from a certain chunk; for example if R failed unexpectedly. 
#' @param progressiveSave Logical. If TRUE then the country output list will be saved between
#' each iteration so that `append` can be used if the function is stopped part way through.
#' @param path Character. The path to a file in which to save the 01_coordinates_transposed_
#' output.
#' @param append Logical. If TRUE, the function will look to append an existing file.
#' @param scale Passed to rnaturalearth's ne_countries().
#' Scale of map to return, one of 110, 50, 10 or 'small', 'medium', 'large'. Default = "large".
#' @param mc.cores Numeric. If > 1, the jbd_correct_coordinates function will run in parallel
#' using mclapply using the number of cores specified. If = 1 then it will be run using a serial
#' loop. NOTE: Windows machines must use a value of 1 (see ?parallel::mclapply). Additionally,
#' be aware that each thread can use large chunks of memory.
#'  Default = 1.#'
#' @return Returns the input data frame with a new column, coordinates_transposed, where FALSE = columns
#' that had coordinates transposed.
#' @export
#' 
#' @importFrom dplyr %>%
#'
#' @examples
#' if(requireNamespace("rnaturalearthdata")){
#' library(dplyr)
#'   # Import and prepare the data
#' data(beesFlagged)
#' beesFlagged <- beesFlagged %>% dplyr::select(!c(.val, .sea)) %>%
#'   # Cut down the dataset to un example quicker
#' dplyr::filter(dplyr::row_number() %in% 1:20)
#'   # Run the function
#' beesFlagged_out <- jbd_Ctrans_chunker(
#' # bdc_coordinates_transposed inputs
#' data = beesFlagged,
#' idcol = "database_id",
#' lat = "decimalLatitude",
#' lon = "decimalLongitude",
#' country = "country_suggested",
#' countryCode = "countryCode",
#' # in decimal degrees (~22 km at the equator)
#' border_buffer = 1, 
#' save_outputs = FALSE,
#' sci_names = "scientificName",
#' # chunker inputs
#' # How many rows to process at a time
#' stepSize = 1000000,  
#' # Start row
#' chunkStart = 1,  
#' # Progressively save the output between each iteration?
#' progressiveSave = FALSE,
#' path = tempdir(),
#' # If FALSE it may overwrite existing dataset
#' append = FALSE,
#'   # Users should select scale = "large" as it is more thoroughly tested
#' scale = "medium",
#' mc.cores = 1
#' ) 
#' table(beesFlagged_out$coordinates_transposed, useNA = "always")
#' } # END if require
#' 
jbd_Ctrans_chunker <- function(
    data = NULL,
    lat = "decimalLatitude",
    lon = "decimalLongitude",
    idcol = "databse_id",
    country = "country_suggested",
    countryCode = "countryCode",
    sci_names = "scientificName",
    border_buffer = 0.2, # in decimal degrees (~22 km at the equator)
    save_outputs = TRUE,
    
     # How many rows to process at a time
    stepSize = 1000000,
     # Start row
    chunkStart = 1,
    progressiveSave = TRUE,
    path = tempdir(),
     # If FALSE it may overwrite existing dataset
    append = TRUE,
    scale = "large",
    mc.cores = 1){
  database_id <- NULL
  
  requireNamespace("dplyr")
  
  #### 0.0 Prep ####
  startTime <- Sys.time()
    # Select the output file name to be saved as you run
  fileName <- paste("01_coordinates_transposed_", Sys.Date(), ".csv", sep = "")
    #### 0.1 nChuncks ####
   # Find the number of chunks needed to complete the run
  nChunks = ceiling(nrow(data)/stepSize)
  # Find the max nrow
  nrowMax <- nrow(data)
   # IF a run failed you can start again from the same spot using append = TRUE
  #### 0.2 Append ####
  if(append == TRUE){
    suppressWarnings({
      # Read in the Tranps_tibble csv
    Tranps_tibble = readr::read_csv(paste0(path, "/","Tranps_tibble.csv"), col_types = ColTypeR())
    })
     # set the chunkStart to the number of rows completed plus one
    chunkStart = nrow(Tranps_tibble) + 1
    nChunks = ceiling((nrow(data)-chunkStart)/stepSize)
  } # END append IF statement
  # The chunkEnd is the same as the stepSize initially, but the chunkEnd will change with each iteration
  # It will also differ if append == true based on where the run is at.
  chunkEnd = (chunkStart + stepSize) - 1
  
  #### 0.3 User text ####
  # Write user output
  writeLines(paste(" - Running chunker with:", "\n",
                   "stepSize = ", 
                   format(stepSize, big.mark=",",scientific=FALSE), "\n",
                   "chunkStart = ", 
                   format(chunkStart, big.mark=",",scientific=FALSE), "\n",
                   "chunkEnd = ", 
                   format(chunkEnd, big.mark=",",scientific=FALSE), "\n",
                   "append = ", append, 
                   sep = ""))
  #### 1.0 Run Loop ####
  # Loop - from chunkStart to the end, process rows in batches of chunkEnd
  for(i in 1:nChunks){
    # Select rows from chunkStart to chunkEnd
    loop_check_pf = data[chunkStart:chunkEnd,] %>%
      tidyr::drop_na(tidyselect::all_of(idcol)) %>%
        # Drop unused factors
      base::droplevels()

    # User output
    writeLines(paste(" - Starting chunk ", i, "...", "\n",
                     "From ",  
                     format(chunkStart, big.mark=",",scientific=FALSE), " to ", 
                     format(chunkEnd, big.mark=",",scientific=FALSE),
                     sep = ""))
      ##### 1.1 Function ####
    # Run the bdc_country_from_coordinates function from the BeeBDC package
    loop_check_pf <- jbd_coordinates_transposed(
      data = loop_check_pf,
      lat = lat,
      lon = lon,
      sci_names = sci_names,
      country = country,
      countryCode = countryCode,
      idcol = idcol,
      border_buffer = border_buffer,
      save_outputs = save_outputs,
      path = path,
      fileName = fileName,
      scale = scale,
      mc.cores = mc.cores)
    
    #### 1.2 Save + bind file ####
    # Save a smaller csv file with the database_id and country name to be matched later
    # For the first instance in the loop...
    if(i == 1 && append == FALSE){
      Tranps_tibble = dplyr::tibble(loop_check_pf)
    }else{
      Tranps_tibble = dplyr::bind_rows(Tranps_tibble, 
                                       loop_check_pf)
    }
    # Set chunkStart to be chunkEnd +1 for the next row
    chunkStart = chunkStart + stepSize
    chunkEnd = chunkEnd + stepSize
    # If chunkEnd surpasses nrowMax, then assign nrowMax.
    if(chunkEnd > nrowMax){
      chunkEnd = nrowMax
    }
    # Make room on the RAM by cleaning up the garbage
    # user output
      #### 1.3 User text ####
    # Print use output
    writeLines(paste(" - Finished chunk ", i, " of ", nChunks, ". ",
                     "Total records examined: ", 
                     format(nrow(Tranps_tibble), big.mark=",",scientific=FALSE),
                     sep = "") )
    # Save as a csv after each iteration
    if(progressiveSave == TRUE){
    readr::write_excel_csv(Tranps_tibble, file = paste0(path, "/", "Tranps_tibble.csv"))}
  } # END loop
  
  
  #### 2.0 Clean and return ####
  # Remove NA values
  Tranps_tibble <- Tranps_tibble %>%
    dplyr::filter(!is.na("database_id")) %>%
    # Remove any duplicates that have been introduced
    dplyr::distinct()
  
  
  endTime <- Sys.time()
  
  message(paste(
    " - Completed in ", 
    round(difftime(endTime, startTime), digits = 2 ),
    " ",
    units(round(endTime - startTime, digits = 2)),
    sep = ""))
  return(Tranps_tibble)
} # END function

Try the BeeBDC package in your browser

Any scripts or data that you put into this service are public.

BeeBDC documentation built on Nov. 4, 2024, 9:06 a.m.