#' Data ingest
#'
#' Ingests data for site scale simulations with rsofun (or any other Dynamic
#' Vegetation Model).
#'
#' @param siteinfo A data frame containing site meta info. Required columns are:
#' \code{"sitename", "date_start", "date_end", "lon", "lat", "elv"}.
#' @param source A character used as identifiyer for the type of data source
#' (e.g., \code{"fluxnet"}). See vignette for a full description of available
#' options.
#' @param getvars A named list of characters specifying the variable names in
#' the source dataset corresponding to standard names \code{"temp"}
#' for temperature, \code{"prec"} for precipitation, \code{"patm"} for
#' atmospheric pressure, \code{"vpd"} for vapour pressure deficit,
#' \code{"netrad"} for net radiation, \code{"swin"} for shortwave incoming
#' radiation, \code{"lwin"} for longwave incoming radiation, \code{"wind"}
#' for wind.
#' @param dir A character specifying the directory where data is located.
#' @param settings A list of additional settings used for reading original files.
#' @param timescale A character or vector of characters, specifying the time
#' scale of data used from the respective source (if multiple time scales are
#' available, otherwise is disregarded).
#' @param parallel A logical specifying whether ingest is run as parallel jobs
#' for each site. This option is only available for \code{source = "modis"}
#' and requires argument \code{ncores} to be set.
#' @param ncores An integer specifying the number of cores for parallel runs of
#' ingest per site. Required only if \code{parallel = TRUE}
#' @param find_closest A logical specifying whether to extract data from the
#' closest gridcell with data if no data is available for the specified
#' location. Defaults to \code{FALSE}.
#' @param verbose if \code{TRUE}, additional messages are printed.
#'
#' @return A named list of data frames (tibbles) containing input data for each
#' site is returned.
#' @import purrr dplyr
#' @importFrom rlang :=
#' @export
#'
#' @examples
#' \dontrun{
#' inputdata <- prepare_input_sofun(
#' settings_input = settings_input,
#' settings_sims = settings_sims,
#' verwrite_climate = FALSE,
#' verbose = TRUE )
#' }
ingest <- function(
siteinfo,
source,
getvars = c(),
dir = NULL,
settings = NULL,
timescale = "d",
parallel = FALSE,
ncores = NULL,
find_closest = FALSE,
verbose = FALSE
){
# CRAN compliance, declaring unstated variables
sitename <- lon <- lat <- date_start <- date_end <- problem <-
year_start_tmp <- x <- y <- lat_orig <- success <- elv <- patm <-
patm_base <-patm_mean <- month <- tavg <-temp <- temp_fine <-
tmax <- tmax_fine <- tmin <- tmin_fine <- prec <- prec_fine <-
days_in_month <- rain <- snow <- srad <- srad_fine <- ppfd <-
ppfd_fine <- wind <- wind_fine <- qair <- vap <- vapr <- vapr_fine <-
ilon <- data <- yy <- mm <- co2_avg <- year <- . <- bias <- NULL
# Check: all sites are distinct wrt name, lon and lat
if (nrow(siteinfo) != nrow(dplyr::distinct(siteinfo, sitename, lon, lat))){
stop("Non-distinct sites present w.r.t. name, lon, and lat.")
}
# Check dates and years for time series types
if (!(source %in% c(
"hwsd",
"etopo1",
"stocker23",
"wwf",
"soilgrids",
"wise",
"gsde",
"worldclim"
))
) {
# complement dates information
if (!("year_start" %in% names(siteinfo))){
if ("date_start" %in% names(siteinfo)){
siteinfo <- siteinfo %>%
mutate(year_start = lubridate::year(date_start))
} else {
stop("ingest(): Columns 'year_start' and 'date_start' missing
in object provided by argument 'siteinfo'")
}
}
if (!("year_end" %in% names(siteinfo))){
if ("date_end" %in% names(siteinfo)){
siteinfo <- siteinfo %>%
mutate(year_end = lubridate::year(date_end))
} else {
stop("ingest(): Columns 'year_end' and 'date_end' missing
in object provided by argument 'siteinfo'")
}
}
if (!("date_start" %in% names(siteinfo))){
if ("year_start" %in% names(siteinfo)){
siteinfo <- siteinfo %>%
mutate(
date_start = lubridate::ymd(paste0(as.character(year_start),
"-01-01")
)
)
} else {
stop("ingest(): Columns 'year_start' and 'date_start' missing
in object provided by argument 'siteinfo'")
}
}
if (!("date_end" %in% names(siteinfo))){
if ("year_end" %in% names(siteinfo)){
siteinfo <- siteinfo %>%
mutate(
date_end = lubridate::ymd(paste0(as.character(year_end),
"-12-31")
)
)
} else {
stop("ingest(): Columns 'year_end' and 'date_end' missing
in object provided by argument 'siteinfo'")
}
}
# check start < end
if (siteinfo %>%
mutate(problem = year_start > year_end) %>%
pull(problem) %>%
any()){
warning("At least one case found where year_start > year_end.
The are exchanged now")
siteinfo <- siteinfo %>%
mutate(year_start_tmp = ifelse(year_start > year_end,
year_end,
year_start)) %>%
mutate(year_end = ifelse(year_start > year_end,
year_start,
year_end)) %>%
mutate(year_start = year_start_tmp) %>%
dplyr::select(-year_start_tmp) %>%
mutate(
date_start = lubridate::ymd(paste0(as.character(year_start),
"-01-01")
)
) %>%
mutate(
date_end = lubridate::ymd(paste0(as.character(year_end),
"-12-31")
)
)
}
}
if (source == "fluxnet"){
# Get data from sources given by site
ddf <- purrr::map(
as.list(seq(nrow(siteinfo))),
~ingest_bysite(
siteinfo$sitename[.],
source = source,
getvars = getvars,
dir = dir,
settings = settings,
timescale = timescale,
year_start = lubridate::year(siteinfo$date_start[.]),
year_end = lubridate::year(siteinfo$date_end[.]),
verbose = verbose
)
) %>%
bind_rows()
} else if (
source == "cru" ||
source == "watch_wfdei" ||
source == "ndep" ||
source == "wfde5"
) {
# Get data from global fields
# special treatment of dates when bias correction is applied
if (!identical(NULL, settings$correct_bias)){
if (settings$correct_bias == "worldclim"){
# save data frame with required dates for
# all sites (may be different by site)
ddf_dates <- purrr::map(
as.list(seq(nrow(siteinfo))),
~ingestr::init_dates_dataframe(
lubridate::year(siteinfo$date_start[.]),
lubridate::year(siteinfo$date_end[.]),
noleap = TRUE,
timescale = timescale))
names(ddf_dates) <- siteinfo$sitename
ddf_dates <- ddf_dates %>%
bind_rows(.id = "sitename")
year_start_wc <- 1970
year_end_wc <- 2000
if (source == "watch_wfdei"){
message(
"Beware: WorldClim data is for years 1970-2000.
Therefore WATCH_WFDEI data is ingested for 1979-(at least) 2000.")
year_start_wc <- 1979 # no earlier years available
siteinfo <- siteinfo %>%
mutate(year_start = ifelse(year_start < year_start_wc, year_start, year_start_wc),
year_end = ifelse(year_end > year_end_wc, year_end, year_end_wc))
} else if (source == "wfde5"){
message(
"Beware: WorldClim data is for years 1970-2000.
Therefore WFDE5 data is ingested for 1979-(at least) 2000.")
year_start_wc <- 1979 # no earlier years available
siteinfo <- siteinfo %>%
mutate(year_start = ifelse(year_start < year_start_wc, year_start, year_start_wc),
year_end = ifelse(year_end > year_end_wc, year_end, year_end_wc))
} else if (source == "cru"){
message(
"Beware: WorldClim data is for years 1970-2000.
Therefore CRU data is ingested for 1970-(at least) 2000.")
siteinfo <- siteinfo %>%
mutate(year_start = ifelse(year_start < year_start_wc,
year_start, year_start_wc),
year_end = ifelse(year_end > year_end_wc,
year_end, year_end_wc))
}
}
}
# this returns a flat data frame with data from all sites
ddf <- ingest_globalfields(
siteinfo,
source = source,
dir = dir,
getvars = getvars,
timescale = timescale,
verbose = FALSE
)
if (find_closest){
# check if data was extracted for all sites (may be located over ocean)
sites_missing <- ddf %>%
group_by(sitename) %>%
summarise(across(tidyselect::vars_select_helpers$where(is.double),
~sum(!is.na(.x)))) %>%
dplyr::filter(across(c(-sitename, -date), ~ .x == 0)) %>%
pull(sitename)
if (length(sites_missing) > 0 & !identical(NULL, settings$correct_bias)){
# determine closest cell with non-NA
if (source == "watch_wfdei"){
path <- paste0(dir, "/WFDEI-elevation.nc")
} else if (source == "cru"){
path <- paste0(dir, "/elv_cru_halfdeg.nc")
}
if (!file.exists(path)) {
stop(paste0("Looking for elevation file for determining
closest land cell, but not found under ", path))
}
rasta <- raster::raster(path)
siteinfo_missing <- siteinfo %>%
dplyr::filter(sitename %in% sites_missing)
siteinfo_missing <- siteinfo_missing %>%
dplyr::select(x = lon, y = lat) %>%
mutate(
lon = raster::xFromCell(
rasta,
which.min(replace(raster::distanceFromPoints(rasta, .),
is.na(rasta), NA))[1]),
lat = raster::yFromCell(
rasta, which.min(replace(raster::distanceFromPoints(rasta, .),
is.na(rasta), NA))[1])) %>%
rename(lon_orig = x, lat_orig = y) %>%
bind_cols(siteinfo_missing %>% dplyr::select(-lon, -lat)) %>%
mutate(success = ifelse(abs(lat-lat_orig)>1.0, FALSE, TRUE))
# extract again for sites with missing data
ddf_missing <- ingest_globalfields(siteinfo_missing,
source = source,
dir = dir,
getvars = getvars,
timescale = timescale,
verbose = FALSE)
if (sum(!siteinfo_missing$success)>0){
warning(
"No land found within 1 degree latitude for the following sites:
Consider excluding them.")
print(siteinfo_missing %>% dplyr::filter(!success))
}
# replace site with adjusted location
ddf <- ddf %>%
dplyr::filter(!(sitename %in% sites_missing)) %>%
bind_rows(ddf_missing)
}
}
# bias-correct atmospheric pressure - per default
if (!is.null(getvars)){
if ("patm" %in% getvars){
df_patm_base <- siteinfo %>%
dplyr::select(sitename, elv) %>%
mutate(patm_base = calc_patm(elv))
ddf <- ddf %>%
group_by(sitename) %>%
summarise(patm_mean = mean(patm, na.rm = TRUE)) %>%
left_join(df_patm_base, by = "sitename") %>%
mutate(scale = patm_base / patm_mean) %>%
right_join(ddf, by = "sitename") %>%
mutate(patm = patm * scale) %>%
dplyr::select(-patm_base, -elv, -patm_mean, -scale)
}
}
if (!identical(NULL, settings$correct_bias)){
if (settings$correct_bias == "worldclim"){
# Bias correction using WorldClim data
getvars_wc <- c()
if ("temp" %in% getvars){getvars_wc <- c(getvars_wc, "tavg")}
if ("tmin" %in% getvars){getvars_wc <- c(getvars_wc, "tmin")}
if ("tmax" %in% getvars){getvars_wc <- c(getvars_wc, "tmax")}
if ("prec" %in% getvars){getvars_wc <- c(getvars_wc, "prec")}
if ("ppfd" %in% getvars){getvars_wc <- c(getvars_wc, "srad")}
if ("wind" %in% getvars){getvars_wc <- c(getvars_wc, "wind")}
if ("vpd" %in% getvars){getvars_wc <- c(getvars_wc, "vapr", "tmin", "tmax")}
if ("swin" %in% getvars){rlang::inform("Bias Correction: Not yet implemented for swin.")}
if ("lwin" %in% getvars){rlang::inform("Bias Correction: Not yet implemented for lwin.")}
df_fine <- ingest_globalfields(siteinfo,
source = "worldclim",
dir = settings$dir_bias,
getvars = NULL,
timescale = NULL,
verbose = FALSE,
layer = unique(getvars_wc))
# Bias correction for temperature: subtract difference
if ("tavg" %in% getvars_wc){
df_bias <- df_fine %>%
dplyr::select(sitename, starts_with("tavg_")) %>%
tidyr::pivot_longer(
cols = starts_with("tavg_"),
names_to = "month",
values_to = "tavg",
names_prefix = "tavg_") %>%
mutate(month = as.integer(month)) %>%
rename(temp_fine = tavg) %>%
right_join(ddf %>%
dplyr::filter(
lubridate::year(date) %in% year_start_wc:year_end_wc
) %>%
mutate(month = lubridate::month(date)) %>%
group_by(sitename, month) %>%
summarise(temp = mean(temp, na.rm = TRUE)),
by = c("sitename", "month")) %>%
mutate(bias = temp - temp_fine) %>%
dplyr::select(-temp, -temp_fine)
# correct bias by month
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(df_bias %>% dplyr::select(sitename, month, bias),
by = c("sitename", "month")) %>%
arrange(sitename, date) %>%
mutate(temp = ifelse(is.na(bias), temp, temp - bias)) %>%
dplyr::select(-bias, -month)
}
# Bias correction for temperature: subtract difference
if ("tmin" %in% getvars_wc){
if (source == "cru"){ # no tmin or tmax in wwfd
df_bias <- df_fine %>%
dplyr::select(sitename, starts_with("tmin_")) %>%
tidyr::pivot_longer(
cols = starts_with("tmin_"),
names_to = "month",
values_to = "tmin",
names_prefix = "tmin_") %>%
mutate(month = as.integer(month)) %>%
rename(tmin_fine = tmin) %>%
right_join(ddf %>%
dplyr::filter(
lubridate::year(date) %in% year_start_wc:year_end_wc
) %>%
mutate(month = lubridate::month(date)) %>%
group_by(sitename, month) %>%
summarise(tmin = mean(tmin, na.rm = TRUE)),
by = c("sitename", "month")) %>%
mutate(bias = tmin - tmin_fine) %>%
dplyr::select(-tmin, -tmin_fine)
# correct bias by month
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(
df_bias %>% dplyr::select(sitename, month, bias),
by = c("sitename", "month")) %>%
arrange(sitename, date) %>%
mutate(tmin = ifelse(is.na(bias), tmin, tmin - bias)) %>%
dplyr::select(-bias, -month)
}
}
# Bias correction for temperature: subtract difference
if ("tmax" %in% getvars_wc){
if (source == "cru"){ # no tmin or tmax in wwfd
df_bias <- df_fine %>%
dplyr::select(sitename, starts_with("tmax_")) %>%
tidyr::pivot_longer(
cols = starts_with("tmax_"),
names_to = "month",
values_to = "tmax",
names_prefix = "tmax_") %>%
mutate(month = as.integer(month)) %>%
rename(tmax_fine = tmax) %>%
right_join(ddf %>%
dplyr::filter(
lubridate::year(date) %in% year_start_wc:year_end_wc
) %>%
mutate(month = lubridate::month(date)) %>%
group_by(sitename, month) %>%
summarise(tmax = mean(tmax, na.rm = TRUE)),
by = c("sitename", "month")) %>%
mutate(bias = tmax - tmax_fine) %>%
dplyr::select(-tmax, -tmax_fine)
# correct bias by month
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(
df_bias %>% dplyr::select(sitename, month, bias),
by = c("sitename", "month")
) %>%
arrange(sitename, date) %>%
mutate(tmax = ifelse(is.na(bias), tmax, tmax - bias)) %>%
dplyr::select(-bias, -month)
}
}
# Bias correction for precipitation: scale by ratio (snow and rain equally)
if ("prec" %in% getvars_wc){
df_bias <- df_fine %>%
dplyr::select(sitename, starts_with("prec_")) %>%
tidyr::pivot_longer(cols = starts_with("prec_"), names_to = "month", values_to = "prec", names_prefix = "prec_") %>%
mutate(month = as.integer(month)) %>%
rename(prec_fine = prec) %>%
mutate(prec_fine = prec_fine / lubridate::days_in_month(month)) %>% # mm/month -> mm/d
mutate(prec_fine = prec_fine / (60 * 60 * 24)) %>% # mm/d -> mm/sec
right_join(ddf %>%
dplyr::filter(lubridate::year(date) %in% year_start_wc:year_end_wc) %>%
mutate(month = lubridate::month(date)) %>%
group_by(sitename, month) %>%
summarise(prec = mean(prec, na.rm = TRUE)),
by = c("sitename", "month")) %>%
mutate(scale = prec_fine / prec) %>%
dplyr::select(sitename, month, scale)
# correct bias by month
if (source == "watch_wfdei" || source == "wfde5"){
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(df_bias %>% dplyr::select(sitename, month, scale), by = c("sitename", "month")) %>%
arrange(sitename, date) %>%
mutate(scale = ifelse(is.infinite(scale), 0, scale)) %>%
mutate(prec = ifelse(is.na(scale), prec, prec * scale),
rain = ifelse(is.na(scale), rain, rain * scale),
snow = ifelse(is.na(scale), snow, snow * scale)) %>%
dplyr::select(-scale, -month)
} else {
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(df_bias %>% dplyr::select(sitename, month, scale), by = c("sitename", "month")) %>%
arrange(sitename, date) %>%
mutate(scale = ifelse(is.infinite(scale), 0, scale)) %>%
mutate(prec = ifelse(is.na(scale), prec, prec * scale)) %>%
dplyr::select(-scale, -month)
}
}
# Bias correction for shortwave radiation: scale by ratio
if ("srad" %in% getvars_wc){
kfFEC <- 2.04
df_bias <- df_fine %>%
dplyr::select(sitename, starts_with("srad_")) %>%
tidyr::pivot_longer(cols = starts_with("srad_"), names_to = "month", values_to = "srad", names_prefix = "srad_") %>%
mutate(month = as.integer(month)) %>%
rename(srad_fine = srad) %>%
mutate(ppfd_fine = 1e3 * srad_fine * kfFEC * 1.0e-6 / (60 * 60 * 24) ) %>% # kJ m-2 day-1 -> mol m−2 s−1 PAR
right_join(ddf %>%
dplyr::filter(lubridate::year(date) %in% year_start_wc:year_end_wc) %>%
mutate(month = lubridate::month(date)) %>%
group_by(sitename, month) %>%
summarise(ppfd = mean(ppfd, na.rm = TRUE)),
by = c("sitename", "month")) %>%
mutate(scale = ppfd_fine / ppfd) %>%
dplyr::select(sitename, month, scale)
# correct bias by month
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(df_bias %>% dplyr::select(sitename, month, scale), by = c("sitename", "month")) %>%
arrange(sitename, date) %>%
mutate(scale = ifelse(is.infinite(scale), 0, scale)) %>%
mutate(ppfd = ifelse(is.na(scale), ppfd, ppfd * scale)) %>%
dplyr::select(-scale, -month)
}
# Bias correction for atmospheric pressure: scale by ratio
if ("wind" %in% getvars_wc){
df_bias <- df_fine %>%
dplyr::select(sitename, starts_with("wind_")) %>%
tidyr::pivot_longer(cols = starts_with("wind_"),
names_to = "month",
values_to = "wind",
names_prefix = "wind_") %>%
mutate(month = as.integer(month)) %>%
rename(wind_fine = wind) %>%
right_join(ddf %>%
dplyr::filter(
lubridate::year(date) %in% year_start_wc:year_end_wc
) %>%
mutate(month = lubridate::month(date)) %>%
group_by(sitename, month) %>%
summarise(wind = mean(wind, na.rm = TRUE)),
by = c("sitename", "month")) %>%
mutate(scale = wind_fine / wind) %>%
dplyr::select(sitename, month, scale)
# correct bias by month
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(df_bias %>% dplyr::select(sitename, month, scale),
by = c("sitename", "month")) %>%
arrange(sitename, date) %>%
mutate(scale = ifelse(is.infinite(scale), 0, scale)) %>%
mutate(wind = ifelse(is.na(scale), wind, wind * scale)) %>%
dplyr::select(-scale, -month)
}
# Bias correction for relative humidity (actually vapour pressure): scale
if ("vapr" %in% getvars_wc){
# calculate vapour pressure from specific humidity - needed for bias correction with worldclim data
if (source == "watch_wfdei" || source == "wfde5"){
# specific humidity (qair, g g-1) is read, convert to vapour pressure (vapr, Pa)
ddf <- ddf %>%
rowwise() %>%
dplyr::mutate(
vapr = calc_vp(qair = qair,
patm = patm)
) %>%
ungroup()
} else if (source == "cru"){
# vapour pressure is read from file, convert from hPa to Pa
ddf <- ddf %>%
dplyr::mutate(vapr = 1e2 * vap) %>%
dplyr::select(-vap)
}
df_bias <- df_fine %>%
dplyr::select(sitename, starts_with("vapr_")) %>%
tidyr::pivot_longer(
cols = starts_with("vapr_"),
names_to = "month",
values_to = "vapr",
names_prefix = "vapr_") %>%
mutate(month = as.integer(month)) %>%
rename(vapr_fine = vapr) %>%
mutate(vapr_fine = vapr_fine * 1e3) %>% # kPa -> Pa
right_join(ddf %>%
dplyr::filter(lubridate::year(date) %in% year_start_wc:year_end_wc) %>%
mutate(month = lubridate::month(date)) %>%
group_by(sitename, month) %>%
summarise(vapr = mean(vapr, na.rm = TRUE)),
by = c("sitename", "month")) %>%
mutate(scale = vapr_fine / vapr) %>%
dplyr::select(sitename, month, scale)
# correct bias by month
ddf <- ddf %>%
mutate(month = lubridate::month(date)) %>%
left_join(df_bias %>% dplyr::select(sitename, month, scale), by = c("sitename", "month")) %>%
arrange(sitename, date) %>%
mutate(scale = ifelse(is.infinite(scale), 0, scale)) %>%
mutate(vapr = ifelse(is.na(scale), vapr, vapr * scale)) %>%
dplyr::select(-scale, -month)
}
# Calculate vapour pressure deficit from specific humidity
if ("vpd" %in% getvars){
if (source == "watch_wfdei" || source == "wfde5"){
# use daily mean temperature
ddf <- ddf %>%
rowwise() %>%
dplyr::mutate(
vapr = calc_vp(
qair = qair,
patm = patm
),
vpd = calc_vpd(eact = vapr, tc = temp)) %>%
ungroup()
} else if (source == "cru"){
# use daily minimum and maximum temperatures
ddf <- ddf %>%
rowwise() %>%
dplyr::mutate(
vpd = calc_vpd(eact = vapr, tmin = tmin, tmax = tmax)
) %>%
ungroup()
}
}
# keep only required dates
ddf <- ddf %>%
right_join(ddf_dates, by = c("sitename", "date"))
}
} else {
# Calculate vapour pressure deficit from specific humidity
# this calculates this variable for cases where there is
# no bias correction
if ("vpd" %in% getvars){
if (source == "watch_wfdei" || source == "wfde5"){
# use daily mean temperature
ddf <- ddf %>%
rowwise() %>%
dplyr::mutate(
vapr = calc_vp(qair = qair, patm = patm),
vpd = calc_vpd(eact = vapr, tc = temp)
) %>%
ungroup()
} else if (source == "cru"){
# use daily minimum and maximum temperatures
ddf <- ddf %>%
rowwise() %>%
dplyr::mutate(
vpd = calc_vpd(eact = vapr, tmin = tmin, tmax = tmax)
) %>%
ungroup()
}
}
}
} else if (source == "gee"){
# Get data from the remote server
# Define years covered based on site meta info:
# take all years used for at least one site.
year_start <- siteinfo %>%
pull(year_start) %>%
min()
year_end <- siteinfo %>%
pull(year_end) %>%
max()
ddf <- purrr::map(
as.list(seq(nrow(siteinfo))),
~ingest_gee_bysite(
slice(siteinfo, .),
start_date = paste0(as.character(year_start), "-01-01"),
end_date = paste0(as.character(year_end), "-12-31"),
overwrite_raw = settings$overwrite_raw,
overwrite_interpol = settings$overwrite_interpol,
band_var = settings$band_var,
band_qc = settings$band_qc,
prod = settings$prod,
prod_suffix = settings$prod_suffix,
varnam = settings$varnam,
productnam = settings$productnam,
scale_factor = settings$scale_factor,
period = settings$period,
python_path = settings$python_path,
gee_path = settings$gee_path,
data_path = settings$data_path,
method_interpol = settings$method_interpol,
keep = settings$keep
)
)
} else if (source == "modis"){
# Get data from the remote server
if (parallel){
if (is.null(ncores)){
stop("Aborting. Please provide number of cores for parallel jobs.")
}
cl <- multidplyr::new_cluster(ncores) %>%
multidplyr::cluster_assign(settings = settings) %>%
multidplyr::cluster_library(
c("dplyr",
"purrr",
"rlang",
"ingestr",
"readr",
"lubridate",
"MODISTools",
"tidyr"))
# distribute to cores, making sure all data from
# a specific site is sent to the same core
ddf <- tibble(ilon = seq(nrow(siteinfo))) %>%
multidplyr::partition(cl) %>%
dplyr::mutate(data = purrr::map( ilon,
~ingest_modis_bysite(
slice(siteinfo, .),
settings))) %>%
collect() %>%
tidyr::unnest(data)
} else {
ddf <- purrr::map(
as.list(seq(nrow(siteinfo))),
~ingest_modis_bysite(
slice(siteinfo, .),
settings
)
)
}
} else if (source == "co2_mlo"){
# Get CO2 data year, independent of site
# if 'dir' is provided, try reading from existing file, otherwise download
path <- paste0(dir, "/df_co2_mlo.csv")
if (!identical(NULL, dir)){
if (file.exists(path)){
df_co2 <- readr::read_csv(path)
} else {
df_co2 <- climate::meteo_noaa_co2() %>%
dplyr::select(year = yy, month = mm, co2_avg)
readr::write_csv(df_co2, file = path)
}
} else {
df_co2 <- climate::meteo_noaa_co2() %>%
dplyr::select(year = yy, month = mm, co2_avg)
}
# aggregate to annual means
df_co2 <- df_co2 %>%
group_by(year) %>%
summarise(co2 = mean(co2_avg, na.rm = TRUE))
# expand to data frame for each site
ddf <- purrr::map(
as.list(seq(nrow(siteinfo))),
~expand_co2_bysite(
df_co2,
sitename = siteinfo$sitename[.],
year_start = lubridate::year(siteinfo$date_start[.]),
year_end = lubridate::year(siteinfo$date_end[.]),
timescale = timescale
)
)
} else if (source == "co2_cmip"){
# Get CO2 data year, independent of site
# if 'dir' is provided, try reading from existing file, otherwise download
path <- paste0(dir, "/cCO2_rcp85_const850-1765.csv")
if (file.exists(path)){
df_co2 <- readr::read_csv(path)
} else {
stop(
"File cCO2_rcp85_const850-1765.csv must be available in directory
specified by 'dir'."
)
}
# expand to data frame for each site
ddf <- purrr::map(
as.list(seq(nrow(siteinfo))),
~expand_co2_bysite(
df_co2,
sitename = siteinfo$sitename[.],
year_start = lubridate::year(siteinfo$date_start[.]),
year_end = lubridate::year(siteinfo$date_end[.]),
timescale = timescale
)
)
} else if (source == "fapar_unity"){
# Assume fapar = 1 for all dates
ddf <- purrr::map(
as.list(seq(nrow(siteinfo))),
~expand_bysite(
sitename = siteinfo$sitename[.],
year_start = lubridate::year(siteinfo$date_start[.]),
year_end = lubridate::year(siteinfo$date_end[.]),
timescale = timescale
) %>%
mutate(fapar = 1.0)
)
} else if (source == "etopo1"){
# Get ETOPO1 elevation data. year_start and year_end not required
ddf <- ingest_globalfields(
siteinfo,
source = source,
dir = dir,
getvars = NULL,
timescale = NULL,
verbose = FALSE
)
} else if (source == "stocker23"){
# Get root zone water storage capacity data. year_start and year_end not required
ddf <- ingest_globalfields(
siteinfo,
source = source,
dir = dir,
getvars = NULL,
timescale = NULL,
verbose = FALSE
)
} else if (source == "hwsd"){
# https://github.com/bluegreen-labs/hwsdr
# Get HWSD soil data. year_start and year_end not required
# TODO replace with {hwsdr} call
# con <- rhwsd::get_hwsd_con()
# ddf <- rhwsd::get_hwsd_siteset(
# x = dplyr::select(siteinfo, sitename, lon, lat),
# con = con, hwsd.bil = settings$fil ) %>%
# dplyr::ungroup() %>%
# dplyr::select(sitename, data) %>%
# tidyr::unnest(data)
} else if (source == "wwf"){
# Get WWF ecoregion data. year_start and year_end not required
ddf <- ingest_globalfields(
siteinfo,
source = source,
dir = dir,
getvars = NULL,
timescale = NULL,
verbose = FALSE,
layer = settings$layer
)
} else if (source == "soilgrids"){
# Get SoilGrids soil data. year_start and year_end not required
# Code from https://git.wur.nl/isric/soilgrids/soilgrids.notebooks/-/blob/master/markdown/xy_info_from_R.md
ddf <- ingest_soilgrids(siteinfo, settings)
} else if (source == "wise"){
# Get WISE30secs soil data. year_start and year_end not required
ddf <- purrr::map(as.list(settings$varnam),
~ingest_wise_byvar(.,
siteinfo,
layer = settings$layer, dir = dir)) %>%
purrr::reduce(left_join, by = c("lon", "lat")) %>%
distinct() %>%
right_join(
dplyr::select(all_of(
siteinfo,
sitename,
lon,
lat
)),
by = c("lon", "lat")) %>%
dplyr::select(-lon, -lat)
} else if (source == "gsde"){
# Get GSDE soil data from tif files (2 files, for bottom and top layers)
ddf <- purrr::map(
as.list(settings$varnam),
~ingest_globalfields(
siteinfo,
source = source,
getvars = NULL,
dir = dir,
timescale = NULL,
verbose = FALSE,
layer = .
)) %>%
map2(as.list(settings$varnam),
~aggregate_layers_gsde(.x, .y, settings$layer)) %>%
purrr::reduce(left_join, by = "sitename")
} else if (source == "worldclim"){
# Get WorldClim data from global raster file
ddf <- ingest_globalfields(
siteinfo,
source = source,
dir = dir,
getvars = NULL,
timescale = NULL,
verbose = FALSE,
layer = settings$varnam
)
ddf <- purrr::map(
as.list(settings$varnam),
~worldclim_pivot_longer(ddf, .)
) |>
purrr::reduce(left_join, by = c("sitename", "month"))
} else {
rlang::warn(paste("you selected source =", source))
stop(
"ingest(): Argument 'source' could not be identified.
Use one of 'fluxnet', 'cru', 'watch_wfdei', 'wfde5',
co2_mlo', 'etopo1', 'stocker23', or 'gee'.")
}
ddf <- ddf %>%
bind_rows() %>%
filter(!is.na(sitename)) %>%
group_by(sitename) %>%
tidyr::nest()
return(ddf)
}
# give each site and day within year the same co2 value
expand_co2_bysite <- function(df, sitename, year_start, year_end, timescale){
ddf <- init_dates_dataframe( year_start, year_end, timescale = timescale) %>%
dplyr::mutate(year = lubridate::year(date)) %>%
dplyr::left_join(
df,
by = "year"
) %>%
dplyr::mutate(sitename = sitename)
return(ddf)
}
expand_bysite <- function(sitename, year_start, year_end, timescale ){
ddf <- init_dates_dataframe( year_start, year_end, timescale = timescale) %>%
dplyr::mutate(sitename = sitename)
return(ddf)
}
aggregate_layers_gsde <- function(df, varnam, use_layer){
# define state variables
var <- above_1 <- above_2 <- above_3 <- above_4 <- above_5 <-
above_6 <- above_7 <- layer <- data <- sitename <-
bottom <- top <- layer <- depth <- depth_tot_cm <-
var <- var_wgt <- NULL
fill_layer_from_above <- function(df, varnam){
df %>%
rename(var = !!varnam) %>%
mutate(above_1 = lag(var),
above_2 = lag(var, n = 2),
above_3 = lag(var, n = 3),
above_4 = lag(var, n = 4),
above_5 = lag(var, n = 5),
above_6 = lag(var, n = 6),
above_7 = lag(var, n = 7)) %>%
mutate(var = ifelse(is.na(var),
ifelse(is.na(above_1),
ifelse(is.na(above_2),
ifelse(is.na(above_3),
ifelse(is.na(above_4),
ifelse(is.na(above_5),
ifelse(is.na(above_6),
ifelse(is.na(above_7),
above_7,
above_7),
above_6),
above_5),
above_4),
above_3),
above_2),
above_1),
var)) %>%
dplyr::select(var, layer) %>%
rename(!!varnam := var)
}
# fill missing values with next available value from layer above
df <- df %>%
group_by(sitename) %>%
tidyr::nest() %>%
mutate(data = purrr::map(data, ~fill_layer_from_above(., varnam))) %>%
tidyr::unnest(data)
df_layers <- tibble(layer = 1:8, bottom = c(4.5, 9.1, 16.6, 28.9, 49.3, 82.9, 138.3, 229.6)) %>%
mutate(top = lag(bottom)) %>%
mutate(top = ifelse(is.na(top), 0, top)) %>%
rowwise() %>%
mutate(depth = bottom - top) %>%
dplyr::select(-top, -bottom)
z_tot_use <- df_layers %>%
ungroup() %>%
dplyr::filter(layer %in% use_layer) %>%
summarise(depth_tot_cm = sum(depth)) %>%
pull(depth_tot_cm)
# weighted sum, weighting by layer depth
df %>%
left_join(df_layers, by = "layer") %>%
rename(var = !!varnam) %>%
dplyr::filter(layer %in% use_layer) %>%
mutate(var_wgt = var * depth / z_tot_use) %>%
group_by(sitename) %>%
summarise(var := sum(var_wgt)) %>%
rename(!!varnam := var)
}
worldclim_pivot_longer <- function(df, varnam){
df |>
# tidyr::unnest(data) |>
dplyr::select(sitename, starts_with(paste0(varnam, "_"))) |>
tidyr::pivot_longer(
cols = starts_with(paste0(varnam, "_")),
names_to = "month",
values_to = varnam,
names_prefix = paste0(varnam, "_")) |>
mutate(month = as.integer(month))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.