This document demonstrates to parallelize weather/climate geospatial data processing with chopin
and what cases users may take advantage of parallel processing or not. We will cover the following formats:
We consider TerraClimate and PRISM data which have its own data format each. Parameter-elevation Regressions on Independent Slopes Model (PRISM) is a high-resolution (800-1,000 meters) gridded climate dataset available in the BIL (band interleaved by line) format which is readable with GDAL. TerraClimate is a high-resolution (5 km) gridded climate dataset in the NetCDF format which is readable with GDAL.
| Data | Source | Resolution | File format | |:------------:|:------------------------|:---------------|:------------| | TerraClimate | University of Idaho | 0.0417 degrees | NetCDF | | PRISM | Oregon State University | 0.0083 degrees | BIL |
The spatial resolution of TerraClimate data commensurates 5 km in the equator, whereas that of PRISM data is approximately 1 km. The data are available in the NetCDF format which is readable with GDAL. We will use the terra
package to read the data.
We will consider the populated places centroids in the mainland United States (i.e., excluding Alaska, Hawaii, and the territories). We will use the tigris
package to download the data.
| Data | Number of features | Source | Type | |:-------------------:|:----------------|:----------------|:----------------| | Census places | 31,377 | US Census Bureau | Polygon | | Block groups | 238,193 | US Census Bureau | Polygon | | Grid points in the southeastern US | 1,204,904 | Author, US Census Bureau (state polygons) | Point |
library(chopin) library(terra) library(sf) library(stars) library(future) library(future.mirai) library(parallelly) library(dplyr) library(tigris) library(tictoc) options(tigris_use_cache = TRUE, sf_use_s2 = FALSE)
We used a machine with 112 physical CPU cores with 750 GB of memory, but we used only a portion of the cores (up to 20) for the demonstration. No maximum possible memory usage was set. However, in typical HPC management systems, users are required to request the number of cores and memory for their jobs. The number of cores and memory capacity should be considered when users parallelize the extraction process.
# populated places state <- tigris::states(year = 2022) statemain <- state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ] target_states <- statemain$GEOID popplace <- lapply(target_states, function(x) tigris::places(x, year = 2022)) %>% do.call(rbind, .) saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz")
state <- tigris::states(year = 2022) statemain <- state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ] target_states <- statemain$GEOID # download populated places options(tigris_use_cache = TRUE) popplace <- lapply(target_states, function(x) { tigris::places(year = 2022, state = x) }) popplace <- do.call(rbind, popplace) # generate circular buffers with 10 km radius popplacep <- sf::st_centroid(popplace, of_largest_polygon = TRUE) %>% sf::st_transform("EPSG:5070") popplacep2 <- sf::st_centroid(popplace, of_largest_polygon = TRUE) popplaceb <- sf::st_buffer(popplacep, dist = units::set_units(10, "km"))
TerraClimate data are provided in yearly NetCDF files, each of which contains monthly layers. We will read the data with the terra
package and preprocess the data to extract the annual mean and sum of the bands by types of columns.
For reproducibility, run the code below with our companion package amadeus
to download terraClimate data. Please note that it will take a while depending on the internet speed.
rlang::check_installed("amadeus") tcli_variables <- c( "aet", "def", "pet", "ppt", "q", "soil", "swe", "PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws" ) amadeus::download_terraclimate( variables = tcli_variables, year = c(2000, 2022), directory_to_save = "./input/terraClimate", acknowledgement = TRUE, download = TRUE, remove_command = TRUE )
# wbd ext_mainland <- c(xmin = -126, xmax = -64, ymin = 22, ymax = 51) ext_mainland <- terra::ext(ext_mainland) path_tc <- file.path("input", "terraClimate") path_tc_files <- list.files( path = path_tc, pattern = "*.nc$", full.names = TRUE ) path_tc_files
#> [1] "input/terraClimate/TerraClimate_aet_2000.nc" #> [2] "input/terraClimate/TerraClimate_aet_2001.nc" #> [truncated] #> [321] "input/terraClimate/TerraClimate_ws_2021.nc" #> [322] "input/terraClimate/TerraClimate_ws_2022.nc"
Fourteen variables are available in TerraClimate data. The table below is from the TerraClimate website.
| Variable | Description | Units | |:--------:|:------------|:------| | aet | Actual Evapotranspiration, monthly total | mm | | def | Climate Water Deficit, monthly total | mm | | PDSI | Palmer Drought Severity Index, at end of month | unitless | | pet | Potential evapotranspiration, monthly total | mm | | ppt | Precipitation, monthly total | mm | | q | Runoff, monthly total | mm | | soil | Soil Moisture, total column - at end of month | mm | | srad | Downward surface shortwave radiation | W/m2 | | swe | Snow water equivalent - at end of month | mm | | tmax | Max Temperature, average for month | C | | tmin | Min Temperature, average for month | C | | vap | Vapor pressure, average for month | kPa | | vpd | Vapor Pressure Deficit, average for month | kpa | | ws | Wind speed, average for month | m/s |
The variables can be categorized into two types: those that will be summed and those that will be averaged.
aet
, def
, pet
, ppt
, q
, soil
, and swe
.PDSI
, srad
, tmax
, tmin
, vap
, vpd
, and ws
.Following that rationale, we will preprocess the data by summing the first seven layers and averaging the rest of the layers. The code blocks below follow "split-apply-combine" strategy. Note that terra::tapp
aggregates layers by its attributes such as time or custom indices.
options(future.globals = FALSE) # some bands should be summed bandnames <- c( "aet", "def", "PDSI", "pet", "ppt", "q", "soil", "srad", "swe", "tmax", "tmin", "vap", "vpd", "ws" ) bandnames_sorted <- sort(bandnames) # single nc file, yearly aggregation by fun value # band for summation bandnames_sum <- c("aet", "def", "pet", "ppt", "q", "soil", "swe") # band for averaging bandnames_avg <- c("PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws") # mean: temporally marginal pixel mean (i.e., monthly -> yearly) # sum: temporally marginal pixel sum (i.e., monthly -> yearly) # Preprocessed data are stored in tictoc::tic("sum: 7 layers") netcdf_read_sum <- split(bandnames_sum, bandnames_sum) |> lapply(function(x) { grep(paste0("(", x, ")"), path_tc_files, value = TRUE) }) |> lapply(function(x) { terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "sum") }) netcdf_read_sum <- Reduce(c, netcdf_read_sum) tictoc::toc()
#> sum: 7 layers: 177.974 sec elapsed
names(netcdf_read_sum) <- paste0(rep(bandnames_sum, each = 23), "_", rep(2000:2022, 7)) netcdf_read_sum
#> class : SpatRaster #> dimensions : 696, 1488, 161 (nrow, ncol, nlyr) #> resolution : 0.04166667, 0.04166667 (x, y) #> extent : -126, -64, 22, 51 (xmin, xmax, ymin, ymax) #> coord. ref. : +proj=longlat +ellps=WGS84 +no_defs #> source(s) : memory #> names : aet_2000, aet_2001, aet_2002, aet_2003, ... #> min values : 22.9, 27.3, 11, 30.8, ... #> max values : 1270.9, 1366.6, 1399, 1411.1, ... #> time (years): 2000 to 2022
# tictoc::tic("mean: 7 layers") # netcdf_read_mean <- # split(bandnames_avg, bandnames_avg) |> # lapply(function(x) { # grep(paste0("(", x, ")"), path_tc_files, value = TRUE) # }) |> # lapply(function(x) { # terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "mean") # }) |> # Reduce(f = c, x = _) # tictoc::toc() # names(netcdf_read_mean) <- # sprintf("%s_%d", rep(bandnames_avg, each = 23), rep(2000:2022, 7)) # netcdf_read_mean
[!WARNING] Stacking raster data may take a long time and consume a large amount of memory depending on users' area of interest and data resolution. Users need to consider the memory capacity of the system before stacking rasters.
We have 14 data elements for 23 years with 12 months each. Below demonstrates the summary of the data layers that were averaged at circular buffer polygons with 10 kilometers (10,000 meters) radius. To leverage multiple cores in your machine, run future::availableCores()
to check the number of cores and set the number of workers in future::plan
accordingly. Typically, there are two logical cores in each physical core in modern central processing units. The number of workers should be set to up to the number of physical cores in the machine for optimal performance. The example below uses future::multicore
which shares the memory across the workers.
tic("multi threads (grid)") future::plan(future::multicore, workers = 8L) grid_init <- chopin::par_pad_grid( popplacep2, mode = "grid", padding = 1e4, nx = 4L, ny = 2L ) multi_grid <- chopin::par_grid( grids = grid_init, fun_dist = extract_at, y = popplacep2, x = netcdf_read_sum, id = "GEOID", func = "mean", radius = 1e4 ) toc()
#> multi threads (grid): 16.668 sec elapsed
system.time( multi <- grep( paste0("(", paste(bandnames_sum, collapse = "|"), ")"), path_tc_files, value = TRUE ) %>% chopin::par_multirasters( filenames = ., fun_dist = extract_at, y = popplacep2, x = rast(), # ignored id = "GEOID", func = "mean", radius = 1e4 ) )
#> user system elapsed #> 5377.495 231.654 762.147
future::plan(future::sequential) # single thread tic("single thread") single <- exactextractr::exact_extract( netcdf_read_sum, sf::st_as_sf(popplaceb), fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = c("GEOID"), progress = FALSE ) toc()
#> single thread: 21.161 sec elapsed
[!CAUTION] All Windows users and RStudio users (all platforms) will not be able to use
future::multicore
due to the restriction infuture
package. Pleasefuture::multisession
instead and note that this option will runs slower thanfuture::multicore
case.
PRISM data are provided in monthly 30-year normal BIL files. Assuming that a user wants to summarize 30-year normal precipitation at 10 kilometers circular buffers of the geogrpahic centroids of US Census Places (from p.26), we demonstrate the extraction process with the chopin
package.
# populated places # mainland states state <- tigris::states(year = 2022) statemain <- state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ] target_states <- statemain$GEOID popplace <- lapply(target_states, function(x) tigris::places(x, year = 2022)) %>% do.call(rbind, .) saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz")
bils <- list.files("input", "bil$", recursive = TRUE, full.names = TRUE) bilssds <- terra::rast(bils[-13]) popplace2 <- sf::st_transform(popplace, crs = terra::crs(bilssds)) popplaceb2 <- sf::st_transform(popplaceb, crs = terra::crs(bilssds))
[!IMPORTANT]
chopin::par_pad_grid
works the best with point datasets since each grid will clip the input features when parallelized. Polygon inputs will result in duplicate values in the output and lead to take longer to complete.
In the same vein as the TerraClimate data, we will use the chopin
package to parallelize the extraction process with grid strategy.
exgrid <- chopin::par_pad_grid( popplacep2, mode = "grid", padding = 1e4, nx = 4L, ny = 2L ) future::plan(future::multicore, workers = 8L) system.time( exmulti <- chopin::par_grid( exgrid, fun_dist = extract_at, y = popplacep2, x = bilssds, radius = units::set_units(1e4, "meter"), id = "GEOID", func = "mean" ) )
#> Your input function was successfully run at CGRIDID: 1 #> Your input function was successfully run at CGRIDID: 2 #> Your input function was successfully run at CGRIDID: 3 #> Your input function was successfully run at CGRIDID: 4 #> Your input function was successfully run at CGRIDID: 5 #> Your input function was successfully run at CGRIDID: 6 #> Your input function was successfully run at CGRIDID: 7 #> Your input function was successfully run at CGRIDID: 8
#> user system elapsed #> 33.090 13.254 14.571
system.time( exsingle <- exactextractr::exact_extract( bilssds, popplaceb2, fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) )
#> user system elapsed #> 19.347 1.927 21.716
future::plan(future::sequential)
Examples above showed that the difference between the parallelized and single-threaded extraction process is not significant. We will increase the buffer size to 50 kilometers and compare the performance of the parallelized and single-threaded extraction process.
# make buffers popplaceb5 <- sf::st_buffer(popplacep, dist = units::set_units(50, "km")) %>% sf::st_transform(terra::crs(bilssds)) system.time( exsingle5 <- exactextractr::exact_extract( bilssds, popplaceb5, fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) )
#> user system elapsed #> 140.373 2.302 144.552
exgrid5k <- chopin::par_pad_grid( popplacep2, mode = "grid", padding = 5e4, nx = 4L, ny = 2L ) future::plan(future::multicore, workers = 8L) system.time( exmulti5k <- chopin::par_grid( exgrid5k, fun_dist = extract_at, y = popplacep2, x = bilssds, radius = 5e4, id = "GEOID", func = "mean" ) )
#> Your input function was successfully run at CGRIDID: 1 #> Your input function was successfully run at CGRIDID: 2 #> Your input function was successfully run at CGRIDID: 3 #> Your input function was successfully run at CGRIDID: 4 #> Your input function was successfully run at CGRIDID: 5 #> Your input function was successfully run at CGRIDID: 6 #> Your input function was successfully run at CGRIDID: 7 #> Your input function was successfully run at CGRIDID: 8 #> user system elapsed #> 152.344 11.003 60.409
future::plan(future::sequential)
[!NOTE] The example above used strings of raster file paths for
surf
argument inchopin::extract_at_buffer
.terra::rast
at multiple file paths will return aSpatRaster
with multiple layers only if the rasters have the same extent and resolution.
This example uses 1,204,934 1-km grid points in the southeastern United States to summarize seven layers of TerraClimate.
## generate 1km grid points in the southeastern US States stt <- tigris::states(year = 2020) targ_states <- c("NC", "SC", "GA", "FL", "AL", "MS", "TN", "LA", "AR") stt_targ <- stt[stt$STUSPS %in% targ_states, ]
stt_t <- sf::st_transform(stt_targ, "EPSG:5070") stt_g <- sf::st_sample(stt_t, type = "regular", 1204934) stt_g <- sf::st_as_sf(stt_g) sf::st_geometry(stt_g) <- "geometry" stt_g$pid <- seq(1, nrow(stt_g))
stt_gb <- sf::st_buffer(stt_g, units::set_units(10, "km")) tic() single_2m <- exactextractr::exact_extract( netcdf_read_sum, stt_gb, fun = "mean", stack_apply = TRUE, force_df = TRUE, max_cells_in_memory = 2.14e9, progress = FALSE ) toc()
#> 855.908 sec elapsed
stt_gbg <- chopin::par_pad_grid( stt_g, mode = "grid", padding = 5e3, nx = 5L, ny = 5L ) future::plan(future::multicore, workers = 8L) system.time( stt5k <- chopin::par_grid( stt_gbg, fun_dist = extract_at, y = stt_g, x = netcdf_read_sum, id = "pid", radius = 1e4, func = "mean", max_cells = 2e7 ) )
#> user system elapsed #> 6.745 4.102 434.041
future::plan(future::sequential)
Using PRISM data, the example below summarizes the mean values of each data element at census block groups.
# set state=NULL and cb=TRUE will download the block groups for the entire US bg <- tigris::block_groups(state = NULL, cb = TRUE, year = 2020) sf::write_sf(bg, file.path("input", "Blockgroups_2020.gpkg"))
## extract prism by par_hierarchy bgsf <- sf::st_read("input/Blockgroups_2020.gpkg")
#> Reading layer `Blockgroups_2020' from data source #> `/ddn/gs1/home/songi2/projects/chopin/input/Blockgroups_2020.gpkg' #> using driver `GPKG' #> Simple feature collection with 242298 features and 11 fields #> Geometry type: MULTIPOLYGON #> Dimension: XY #> Bounding box: xmin: -179.1467 ymin: -14.5487 xmax: 179.7785 ymax: 71.38782 #> Geodetic CRS: NAD83
bgsf_main <- bgsf %>% dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69")) ## extract prism at bg system.time( exsingle <- exactextractr::exact_extract( bilssds, bgsf_main, fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) )
#> user system elapsed #> 60.387 2.059 64.147
nmain <- c("AS", "AK", "HI", "PR", "VI", "GU", "MP") stt_nmain <- stt[!stt$STUSPS %in% nmain, ] future::plan(future.mirai::mirai_multisession, workers = 20L) system.time( exhierarchy <- chopin::par_hierarchy( bgsf_main, regions_id = "STATEFP", fun_dist = extract_at, y = bgsf_main, x = bilssds, id = "GEOID" ) )
#> user system elapsed #> 2.205 0.558 158.905
future::plan(future::sequential)
We demonstrate the extraction process with the CropScape dataset which has a resolution of 30 meters. In this case, we use frac
option of exact_extract
which tabulates the fraction of the area of the cell category that is covered by the polygon after accounting for partial coverage of polygon segments over raster cells.
cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif") system.time( bgsf_cdl_single <- exactextractr::exact_extract( cdl, bgsf_main, fun = "frac", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) )
#> user system elapsed #> 1013.411 44.322 1369.053
For balancing computation time, we will split the block groups into nine subsets to parallelize. Note that mode = "grid_quantile"
is used in par_pad_grid
to balance the number of block groups per grid. When input
argument of par_pad_grid
is polygons, a few polygons will have duplicate rows in the output data.frame since block groups overlapping each grid will be selected.
cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif") bgsf <- sf::st_read("input/Blockgroups_2020.gpkg") bgsf_main <- bgsf %>% dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69")) # balancing the number of features assigned per workers # by splitting block groups by splitting centroids bgsf_9grids_plain <- bgsf_main %>% chopin::par_pad_grid( mode = "grid", nx = 3L, ny = 3L, padding = 1e4 ) bgsf_9grids <- bgsf_main %>% chopin::par_pad_grid( mode = "grid_quantile", padding = 1e4, quantiles = chopin::par_def_q(3L) ) future::plan(future.mirai::mirai_multisession, workers = 9L) # Note that bgsf_9grids were converted to sf # for exporting the objects to the parallel workers system.time( bgsf_cdl_par <- chopin::par_grid( lapply(bgsf_9grids, sf::st_as_sf), fun_dist = extract_at, y = bgsf_main, x = cdl, id = "GEOID", func = "frac", max_cells = 2e7 ) )
#> Your input function was successfully run at CGRIDID: 1 #> Your input function was successfully run at CGRIDID: 2 #> Your input function was successfully run at CGRIDID: 3 #> Your input function was successfully run at CGRIDID: 4 #> Your input function was successfully run at CGRIDID: 5 #> Your input function was successfully run at CGRIDID: 6 #> Your input function was successfully run at CGRIDID: 7 #> Your input function was successfully run at CGRIDID: 8 #> Your input function was successfully run at CGRIDID: 9 #> user system elapsed #> 6.632 1.821 347.638
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.